Mike, It looks like you committed this work to master without the JIRA number in the log? Seems like a mistake?
Or maybe I’m misinterpreting this push. The switch to git has increased the commit list volume many fold... -- Steve www.lucidworks.com > On Feb 11, 2016, at 8:42 AM, [email protected] wrote: > > Repository: lucene-solr > Updated Branches: > refs/heads/master 35337e8cf -> 12b8721a4 > > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java > ---------------------------------------------------------------------- > diff --git > a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java > > b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java > new file mode 100644 > index 0000000..5a073ff > --- /dev/null > +++ > b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java > @@ -0,0 +1,1175 @@ > +package org.apache.lucene.replicator.nrt; > + > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +import java.io.BufferedReader; > +import java.io.Closeable; > +import java.io.IOException; > +import java.io.InputStreamReader; > +import java.io.UnsupportedEncodingException; > +import java.io.Writer; > +import java.net.InetAddress; > +import java.net.Socket; > +import java.net.SocketException; > +import java.nio.charset.MalformedInputException; > +import java.nio.charset.StandardCharsets; > +import java.nio.file.Files; > +import java.nio.file.Path; > +import java.nio.file.Paths; > +import java.nio.file.StandardOpenOption; > +import java.util.ArrayList; > +import java.util.Arrays; > +import java.util.Collections; > +import java.util.HashMap; > +import java.util.HashSet; > +import java.util.List; > +import java.util.Locale; > +import java.util.Map; > +import java.util.Set; > +import java.util.concurrent.ConcurrentHashMap; > +import java.util.concurrent.atomic.AtomicBoolean; > +import java.util.concurrent.atomic.AtomicInteger; > +import java.util.concurrent.atomic.AtomicLong; > +import java.util.concurrent.locks.Lock; > +import java.util.concurrent.locks.ReentrantLock; > +import java.util.regex.Pattern; > + > +import org.apache.lucene.analysis.MockAnalyzer; > +import org.apache.lucene.document.Document; > +import org.apache.lucene.document.Field; > +import org.apache.lucene.index.ConcurrentMergeScheduler; > +import org.apache.lucene.index.DirectoryReader; > +import org.apache.lucene.index.IndexWriter; > +import org.apache.lucene.index.IndexWriterConfig; > +import org.apache.lucene.index.SegmentInfos; > +import org.apache.lucene.index.Term; > +import org.apache.lucene.search.IndexSearcher; > +import org.apache.lucene.search.Query; > +import org.apache.lucene.search.ScoreDoc; > +import org.apache.lucene.search.TermQuery; > +import org.apache.lucene.search.TopDocs; > +import org.apache.lucene.store.AlreadyClosedException; > +import org.apache.lucene.store.DataInput; > +import org.apache.lucene.store.DataOutput; > +import org.apache.lucene.store.Directory; > +import org.apache.lucene.store.InputStreamDataInput; > +import org.apache.lucene.store.MockDirectoryWrapper; > +import org.apache.lucene.store.NIOFSDirectory; > +import org.apache.lucene.store.OutputStreamDataOutput; > +import org.apache.lucene.store.RateLimiter; > +import org.apache.lucene.util.BytesRef; > +import org.apache.lucene.util.IOUtils; > +import org.apache.lucene.util.LineFileDocs; > +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; > +import org.apache.lucene.util.LuceneTestCase.SuppressSysoutChecks; > +import org.apache.lucene.util.LuceneTestCase; > +import org.apache.lucene.util.TestUtil; > +import org.apache.lucene.util.ThreadInterruptedException; > + > +import com.carrotsearch.randomizedtesting.SeedUtils; > + > +// nocommit sometimes have one replica be really slow at copying / have > random pauses (fake GC) / etc. > + > +// nocommit randomly p.destroy() one replica? > + > +/* > + TODO > + - why do we do the "rename temp to actual" all at the end...? what > really does that buy us? > + - replica should also track maxSegmentName its seen, and tap into > inflateGens if it's later promoted to primary? > + - test should not print scary exceptions and then succeed! > + - since all nodes are local, we could have a different test only impl > that just does local file copies instead of via tcp... > + - are the pre-copied-completed-merged files not being cleared in primary? > + - hmm the logic isn't right today? a replica may skip pulling a given > copy state, that recorded the finished merged segments? > + - beast & fix bugs > + - graceful cluster restart > + - better translog integration > + - get "graceful primary shutdown" working > + - there is still some global state we rely on for "correctness", e.g. > lastPrimaryVersion > + - clean up how version is persisted in commit data > + - why am i not using hashes here? how does ES use them? > + - get all other "single shard" functions working too: this cluster > should "act like" a single shard > + - SLM > + - controlled nrt reopen thread / returning long gen on write > + - live field values > + - add indexes > + - make cluster level APIs to search, index, that deal w/ primary > failover, etc. > + - must prune xlog > + - refuse to start primary unless we have quorum > + - later > + - if we named index files using segment's ID we wouldn't have file > name conflicts after primary crash / rollback? > + - back pressure on indexing if replicas can't keep up? > + - get xlog working on top? needs to be checkpointed, so we can > correlate IW ops to NRT reader version and prune xlog based on commit > + quorum > + - maybe fix IW to return "gen" or "seq id" or "segment name" or > something? > + - replica can copy files from other replicas too / use multicast / > rsync / something > + - each replica could also pre-open a SegmentReader after pre-copy when > warming a merge > + - we can pre-copy newly flushed files too, for cases where reopen rate > is low vs IW's flushing because RAM buffer is full > + - opto: pre-copy files as they are written; if they will become CFS, > we can build CFS on the replica? > + - what about multiple commit points? > + - fix primary to init directly from an open replica, instead of having > to commit/close the replica first > +*/ > + > +// Tricky cases: > +// - we are pre-copying a merge, then replica starts up part way through, > so it misses that pre-copy and must do it on next nrt point > +// - a down replica starts up, but it's "from the future" vs the current > primary, and must copy over file names with different contents > +// but referenced by its latest commit point, so it must fully remove > that commit ... which is a hazardous window > +// - replica comes up just as the primary is crashing / moving > +// - electing a new primary when a replica is just finishing its nrt sync: > we need to wait for it so we are sure to get the "most up to > +// date" replica > +// - replica comes up after merged segment finished so it doesn't copy > over the merged segment "promptly" (i.e. only sees it on NRT refresh) > + > +/** > + * Test case showing how to implement NRT replication. This test spawns a > sub-process per-node, running TestNRTReplicationChild. > + * > + * One node is primary, and segments are periodically flushed there, then > concurrently the N replica nodes copy the new files over and open new > readers, while > + * primary also opens a new reader. > + * > + * Nodes randomly crash and are restarted. If the primary crashes, a > replica is promoted. > + * > + * Merges are currently first finished on the primary and then pre-copied > out to replicas with a merged segment warmer so they don't block > + * ongoing NRT reopens. Probably replicas could do their own merging > instead, but this is more complex and may not be better overall > + * (merging takes a lot of IO resources). > + * > + * Slow network is simulated with a RateLimiter. > + */ > + > +// nocommit randomly delete all doc sometimes, 1) using IW.deleteAll and 2) > doing it inefficiently (by query, by id) > + > +// MockRandom's .sd file has no index header/footer: > +@SuppressCodecs({"MockRandom", "Memory", "Direct", "SimpleText"}) > +@SuppressSysoutChecks(bugUrl = "Stuff gets printed, important stuff for > debugging a failure") > +public class TestNRTReplication extends LuceneTestCase { > + > + // Test evilness controls: > + > + /** Randomly crash the current primary (losing data!) and promote the > "next best" replica. */ > + static final boolean DO_CRASH_PRIMARY = true; > + > + /** Randomly crash (JVM core dumps) a replica; it will later randomly be > restarted and sync itself. */ > + static final boolean DO_CRASH_REPLICA = true; > + > + /** Randomly gracefully close a replica; it will later be restarted and > sync itself. */ > + static final boolean DO_CLOSE_REPLICA = true; > + > + /** If false, all child + parent output is interleaved into single > stdout/err */ > + static final boolean SEPARATE_CHILD_OUTPUT = false; > + > + // nocommit DO_CLOSE_PRIMARY? > + > + /** Randomly crash whole cluster and then restart it */ > + static final boolean DO_FULL_CLUSTER_CRASH = true; > + > + /** True if we randomly flip a bit while copying files out */ > + static final boolean DO_BIT_FLIPS_DURING_COPY = true; > + > + /** Set to a non-null value to force exactly that many nodes; else, it's > random. */ > + static final Integer NUM_NODES = null; > + > + static final boolean DO_RANDOM_XLOG_REPLAY = false; > + > + final AtomicBoolean failed = new AtomicBoolean(); > + > + final AtomicBoolean stop = new AtomicBoolean(); > + > + /** cwd where we start each child (server) node */ > + private Path childTempDir; > + > + long primaryGen; > + > + volatile long lastPrimaryVersion; > + > + volatile NodeProcess primary; > + volatile NodeProcess[] nodes; > + volatile long[] nodeTimeStamps; > + volatile boolean[] starting; > + > + Path[] indexPaths; > + > + Path transLogPath; > + SimpleTransLog transLog; > + final AtomicInteger markerUpto = new AtomicInteger(); > + > + /** Maps searcher version to how many hits the query body:the matched. */ > + final Map<Long,Integer> hitCounts = new ConcurrentHashMap<>(); > + > + /** Maps searcher version to how many marker documents matched. This > should only ever grow (we never delete marker documents). */ > + final Map<Long,Integer> versionToMarker = new ConcurrentHashMap<>(); > + > + /** Maps searcher version to xlog location when refresh of this version > started. */ > + final Map<Long,Long> versionToTransLogLocation = new ConcurrentHashMap<>(); > + > + public void test() throws Exception { > + > + Node.globalStartNS = System.nanoTime(); > + > + message("change thread name from " + Thread.currentThread().getName()); > + Thread.currentThread().setName("main"); > + > + childTempDir = createTempDir("child"); > + > + // We are parent process: > + > + // Silly bootstrapping: > + versionToTransLogLocation.put(0L, 0L); > + versionToTransLogLocation.put(1L, 0L); > + > + int numNodes; > + > + if (NUM_NODES == null) { > + numNodes = TestUtil.nextInt(random(), 2, 10); > + } else { > + numNodes = NUM_NODES.intValue(); > + } > + > + System.out.println("TEST: using " + numNodes + " nodes"); > + > + transLogPath = createTempDir("NRTReplication").resolve("translog"); > + transLog = new SimpleTransLog(transLogPath); > + > + //state.rateLimiters = new RateLimiter[numNodes]; > + indexPaths = new Path[numNodes]; > + nodes = new NodeProcess[numNodes]; > + nodeTimeStamps = new long[numNodes]; > + Arrays.fill(nodeTimeStamps, Node.globalStartNS); > + starting = new boolean[numNodes]; > + > + for(int i=0;i<numNodes;i++) { > + indexPaths[i] = createTempDir("index" + i); > + } > + > + Thread[] indexers = new Thread[TestUtil.nextInt(random(), 1, 3)]; > + System.out.println("TEST: launch " + indexers.length + " indexer > threads"); > + for(int i=0;i<indexers.length;i++) { > + indexers[i] = new IndexThread(); > + indexers[i].setName("indexer" + i); > + indexers[i].setDaemon(true); > + indexers[i].start(); > + } > + > + Thread[] searchers = new Thread[TestUtil.nextInt(random(), 1, 3)]; > + System.out.println("TEST: launch " + searchers.length + " searcher > threads"); > + for(int i=0;i<searchers.length;i++) { > + searchers[i] = new SearchThread(); > + searchers[i].setName("searcher" + i); > + searchers[i].setDaemon(true); > + searchers[i].start(); > + } > + > + Thread restarter = new RestartThread(); > + restarter.setName("restarter"); > + restarter.setDaemon(true); > + restarter.start(); > + > + int runTimeSec; > + if (TEST_NIGHTLY) { > + runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 120, 240); > + } else { > + runTimeSec = RANDOM_MULTIPLIER * TestUtil.nextInt(random(), 45, 120); > + } > + > + System.out.println("TEST: will run for " + runTimeSec + " sec"); > + > + long endTime = System.nanoTime() + runTimeSec*1000000000L; > + > + sendReplicasToPrimary(); > + > + while (failed.get() == false && System.nanoTime() < endTime) { > + > + // Wait a bit: > + Thread.sleep(TestUtil.nextInt(random(), Math.min(runTimeSec*4, 200), > runTimeSec*4)); > + if (primary != null && random().nextBoolean()) { > + message("top: now flush primary"); > + NodeProcess curPrimary = primary; > + if (curPrimary != null) { > + > + // Save these before we start flush: > + long nextTransLogLoc = transLog.getNextLocation(); > + int markerUptoSav = markerUpto.get(); > + > + long result; > + try { > + result = primary.flush(); > + } catch (Throwable t) { > + message("top: flush failed; skipping: " + t.getMessage()); > + result = -1; > + } > + if (result > 0) { > + // There were changes > + lastPrimaryVersion = result; > + addTransLogLoc(lastPrimaryVersion, nextTransLogLoc); > + addVersionMarker(lastPrimaryVersion, markerUptoSav); > + } > + } > + } > + > + StringBuilder sb = new StringBuilder(); > + int liveCount = 0; > + for(int i=0;i<nodes.length;i++) { > + NodeProcess node = nodes[i]; > + if (node != null) { > + if (sb.length() != 0) { > + sb.append(" "); > + } > + liveCount++; > + if (node.isPrimary) { > + sb.append('P'); > + } else { > + sb.append('R'); > + } > + sb.append(i); > + } > + } > + > + message("PG=" + (primary == null ? "X" : primaryGen) + " " + liveCount > + " (of " + nodes.length + ") nodes running: " + sb); > + > + // Commit a random node, primary or replica > + > + { > + NodeProcess node = nodes[random().nextInt(nodes.length)]; > + if (node != null) { > + // TODO: if this node is primary, it means we committed a > "partial" version (not exposed as an NRT point)... not sure it matters. > + // maybe we somehow allow IW to commit a specific sis (the one we > just flushed)? > + message("top: now commit node=" + node); > + node.commitAsync(); > + } > + } > + } > + > + message("TEST: top: test done, now close"); > + stop.set(true); > + for(Thread thread : indexers) { > + thread.join(); > + } > + for(Thread thread : searchers) { > + thread.join(); > + } > + restarter.join(); > + > + // Close replicas before primary so we cancel any in-progres > replications: > + System.out.println("TEST: top: now close replicas"); > + List<Closeable> toClose = new ArrayList<>(); > + for(NodeProcess node : nodes) { > + if (node != primary && node != null) { > + toClose.add(node); > + } > + } > + IOUtils.close(toClose); > + IOUtils.close(primary); > + IOUtils.close(transLog); > + > + if (failed.get() == false) { > + message("TEST: top: now checkIndex"); > + for(Path path : indexPaths) { > + message("TEST: check " + path); > + MockDirectoryWrapper dir = newMockFSDirectory(path); > + // Just too slow otherwise > + dir.setCrossCheckTermVectorsOnClose(false); > + dir.close(); > + } > + } else { > + message("TEST: failed; skip checkIndex"); > + } > + } > + > + private boolean anyNodesStarting() { > + for(int id=0;id<nodes.length;id++) { > + if (starting[id]) { > + return true; > + } > + } > + > + return false; > + } > + > + /** Picks a replica and promotes it as new primary. */ > + private void promoteReplica() throws IOException { > + message("top: primary crashed; now pick replica to promote"); > + long maxSearchingVersion = -1; > + NodeProcess replicaToPromote = null; > + > + // We must promote the most current replica, because otherwise file name > reuse can cause a replication to fail when it needs to copy > + // over a file currently held open for searching. This also minimizes > recovery work since the most current replica means less xlog > + // replay to catch up: > + for (NodeProcess node : nodes) { > + if (node != null) { > + message("ask " + node + " for its current searching version"); > + long searchingVersion = node.getSearchingVersion(); > + message(node + " has searchingVersion=" + searchingVersion); > + if (searchingVersion > maxSearchingVersion) { > + maxSearchingVersion = searchingVersion; > + replicaToPromote = node; > + } > + } > + } > + > + if (replicaToPromote == null) { > + message("top: no replicas running; skipping primary promotion"); > + return; > + } > + > + message("top: promote " + replicaToPromote + " version=" + > maxSearchingVersion + "; now commit"); > + if (replicaToPromote.commit() == false) { > + message("top: commit failed; skipping primary promotion"); > + return; > + } > + > + message("top: now shutdown " + replicaToPromote); > + if (replicaToPromote.shutdown() == false) { > + message("top: shutdown failed for R" + replicaToPromote.id + "; > skipping primary promotion"); > + return; > + } > + > + int id = replicaToPromote.id; > + message("top: now startPrimary " + replicaToPromote); > + startPrimary(replicaToPromote.id); > + } > + > + void startPrimary(int id) throws IOException { > + message(id + ": top: startPrimary lastPrimaryVersion=" + > lastPrimaryVersion); > + assert nodes[id] == null; > + > + // Force version of new primary to advance beyond where old primary was, > so we never re-use versions. It may have > + // already advanced beyond newVersion, e.g. if it flushed new segments > while during xlog replay: > + > + // First start node as primary (it opens an IndexWriter) but do not > publish it for searching until we replay xlog: > + NodeProcess newPrimary = startNode(id, indexPaths[id], true, > lastPrimaryVersion+1); > + if (newPrimary == null) { > + message("top: newPrimary failed to start; abort"); > + return; > + } > + > + // Get xlog location that this node was guaranteed to already have > indexed through; this may replay some ops already indexed but it's OK > + // because the ops are idempotent: we updateDocument (by docid) on > replay even for original addDocument: > + Long startTransLogLoc; > + Integer markerCount; > + if (newPrimary.initCommitVersion == 0) { > + startTransLogLoc = 0L; > + markerCount = 0; > + } else { > + startTransLogLoc = > versionToTransLogLocation.get(newPrimary.initCommitVersion); > + markerCount = versionToMarker.get(newPrimary.initCommitVersion); > + } > + assert startTransLogLoc != null: "newPrimary.initCommitVersion=" + > newPrimary.initCommitVersion + " is missing from versionToTransLogLocation: > keys=" + versionToTransLogLocation.keySet(); > + assert markerCount != null: "newPrimary.initCommitVersion=" + > newPrimary.initCommitVersion + " is missing from versionToMarker: keys=" + > versionToMarker.keySet(); > + > + // When the primary starts, the userData in its latest commit point > tells us which version it had indexed up to, so we know where to > + // replay from in the xlog. However, we forcefuly advance the version, > and then IW on init (or maybe getReader) also adds 1 to it. > + // Since we publish the primary in this state (before xlog replay is > done), a replica can start up at this point and pull this version, > + // and possibly later be chosen as a primary, causing problems if the > version is known recorded in the translog map. So we record it > + // here: > + > + addTransLogLoc(newPrimary.initInfosVersion, startTransLogLoc); > + addVersionMarker(newPrimary.initInfosVersion, markerCount); > + > + assert newPrimary.initInfosVersion >= lastPrimaryVersion; > + message("top: now change lastPrimaryVersion from " + lastPrimaryVersion > + " to " + newPrimary.initInfosVersion); > + lastPrimaryVersion = newPrimary.initInfosVersion; > + > + // Publish new primary, before replaying xlog. This means other > indexing ops can come in at the same time as we catch up indexing > + // previous ops. Effectively, we have "forked" the indexing ops, by > rolling back in time a bit, and replaying old indexing ops (from > + // translog) concurrently with new incoming ops. > + nodes[id] = newPrimary; > + primary = newPrimary; > + > + sendReplicasToPrimary(); > + > + long nextTransLogLoc = transLog.getNextLocation(); > + int nextMarkerUpto = markerUpto.get(); > + message("top: replay trans log " + startTransLogLoc + " (version=" + > newPrimary.initCommitVersion + ") to " + nextTransLogLoc + " (translog end)"); > + try { > + transLog.replay(newPrimary, startTransLogLoc, nextTransLogLoc); > + } catch (IOException ioe) { > + message("top: replay xlog failed; abort"); > + return; > + } > + message("top: done replay trans log"); > + } > + > + final AtomicLong nodeStartCounter = new AtomicLong(); > + > + final Set<Integer> crashingNodes = Collections.synchronizedSet(new > HashSet<>()); > + > + /** Launches a child "server" (separate JVM), which is either primary or > replica node */ > + NodeProcess startNode(final int id, Path indexPath, boolean isPrimary, > long forcePrimaryVersion) throws IOException { > + nodeTimeStamps[id] = System.nanoTime(); > + List<String> cmd = new ArrayList<>(); > + > + NodeProcess curPrimary = primary; > + > + cmd.add(System.getProperty("java.home") > + + System.getProperty("file.separator") > + + "bin" > + + System.getProperty("file.separator") > + + "java"); > + cmd.add("-Xmx512m"); > + > + if (curPrimary != null) { > + cmd.add("-Dtests.nrtreplication.primaryTCPPort=" + curPrimary.tcpPort); > + } else if (isPrimary == false) { > + // We cannot start a replica when there is no primary: > + return null; > + } > + > + cmd.add("-Dtests.nrtreplication.node=true"); > + cmd.add("-Dtests.nrtreplication.nodeid=" + id); > + cmd.add("-Dtests.nrtreplication.startNS=" + Node.globalStartNS); > + cmd.add("-Dtests.nrtreplication.indexpath=" + indexPath); > + if (isPrimary) { > + cmd.add("-Dtests.nrtreplication.isPrimary=true"); > + cmd.add("-Dtests.nrtreplication.forcePrimaryVersion=" + > forcePrimaryVersion); > + } > + > + long myPrimaryGen = primaryGen; > + cmd.add("-Dtests.nrtreplication.primaryGen=" + myPrimaryGen); > + > + // Mixin our own counter because this is called from a fresh thread > which means the seed otherwise isn't changing each time we spawn a > + // new node: > + long seed = random().nextLong() * nodeStartCounter.incrementAndGet(); > + > + cmd.add("-Dtests.seed=" + SeedUtils.formatSeed(seed)); > + cmd.add("-ea"); > + cmd.add("-cp"); > + cmd.add(System.getProperty("java.class.path")); > + cmd.add("org.junit.runner.JUnitCore"); > + cmd.add(getClass().getName().replace(getClass().getSimpleName(), > "SimpleServer")); > + > + Writer childLog; > + > + if (SEPARATE_CHILD_OUTPUT) { > + Path childOut = childTempDir.resolve(id + ".log"); > + message("logging to " + childOut); > + childLog = Files.newBufferedWriter(childOut, StandardCharsets.UTF_8, > StandardOpenOption.APPEND, StandardOpenOption.CREATE); > + childLog.write("\n\nSTART NEW CHILD:\n"); > + } else { > + childLog = null; > + } > + > + message("child process command: " + cmd); > + ProcessBuilder pb = new ProcessBuilder(cmd); > + pb.redirectErrorStream(true); > + > + // Important, so that the scary looking hs_err_<pid>.log appear under > our test temp dir: > + pb.directory(childTempDir.toFile()); > + > + Process p = pb.start(); > + > + BufferedReader r; > + try { > + r = new BufferedReader(new InputStreamReader(p.getInputStream(), > IOUtils.UTF_8)); > + } catch (UnsupportedEncodingException uee) { > + throw new RuntimeException(uee); > + } > + > + int tcpPort = -1; > + long initCommitVersion = -1; > + long initInfosVersion = -1; > + Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*"); > + boolean willCrash = false; > + boolean sawExistingSegmentsFile = false; > + > + while (true) { > + String l = r.readLine(); > + if (l == null) { > + message("top: node=" + id + " failed to start"); > + try { > + p.waitFor(); > + } catch (InterruptedException ie) { > + throw new RuntimeException(ie); > + } > + message("exit value=" + p.exitValue()); > + > + // Hackity hack, in case primary crashed/closed and we haven't > noticed (reaped the process) yet: > + if (isPrimary == false) { > + if (sawExistingSegmentsFile) { > + // This means MDW's virus checker blocked us from deleting > segments_N that we must delete in order to start ... just return null > + // and retry again later: > + message("failed to remove segments_N; skipping"); > + return null; > + } > + for(int i=0;i<10;i++) { > + if (primaryGen != myPrimaryGen || primary == null) { > + // OK: primary crashed while we were trying to start, so it's > expected/allowed that we could not start the replica: > + message("primary crashed/closed while replica R" + id + " > tried to start; skipping"); > + return null; > + } else { > + try { > + Thread.sleep(10); > + } catch (InterruptedException ie) { > + throw new ThreadInterruptedException(ie); > + } > + } > + } > + } > + > + // Should fail the test: > + message("top: now fail test replica R" + id + " failed to start"); > + failed.set(true); > + throw new RuntimeException("replica R" + id + " failed to start"); > + } > + > + if (childLog != null) { > + childLog.write(l); > + childLog.write("\n"); > + childLog.flush(); > + } else if (logTimeStart.matcher(l).matches()) { > + // Already a well-formed log output: > + System.out.println(l); > + } else { > + message(l); > + } > + > + if (l.startsWith("PORT: ")) { > + tcpPort = Integer.parseInt(l.substring(6).trim()); > + } else if (l.startsWith("COMMIT VERSION: ")) { > + initCommitVersion = Integer.parseInt(l.substring(16).trim()); > + } else if (l.startsWith("INFOS VERSION: ")) { > + initInfosVersion = Integer.parseInt(l.substring(15).trim()); > + } else if (l.contains("will crash after")) { > + willCrash = true; > + } else if (l.startsWith("NODE STARTED")) { > + break; > + } else if (l.contains("replica cannot start: existing segments > file=")) { > + sawExistingSegmentsFile = true; > + } > + } > + > + final boolean finalWillCrash = willCrash; > + > + // Baby sits the child process, pulling its stdout and printing to our > stdout, calling nodeClosed once it exits: > + Thread pumper = ThreadPumper.start( > + new Runnable() { > + @Override > + public void run() { > + message("now wait for process " + > p); > + try { > + p.waitFor(); > + } catch (Throwable t) { > + throw new RuntimeException(t); > + } > + > + message("done wait for process " > + p); > + int exitValue = p.exitValue(); > + message("exit value=" + exitValue > + " willCrash=" + finalWillCrash); > + if (childLog != null) { > + try { > + childLog.write("process done; > exitValue=" + exitValue + "\n"); > + childLog.close(); > + } catch (IOException ioe) { > + throw new > RuntimeException(ioe); > + } > + } > + if (exitValue != 0 && > finalWillCrash == false && crashingNodes.remove(id) == false) { > + // should fail test > + failed.set(true); > + if (childLog != null) { > + throw new > RuntimeException("node " + id + " process had unexpected non-zero exit > status=" + exitValue + "; see " + childLog + " for details"); > + } else { > + throw new > RuntimeException("node " + id + " process had unexpected non-zero exit > status=" + exitValue); > + } > + } > + nodeClosed(id); > + } > + }, r, System.out, childLog); > + pumper.setName("pump" + id); > + > + message("top: node=" + id + " started at tcpPort=" + tcpPort + " > initCommitVersion=" + initCommitVersion + " initInfosVersion=" + > initInfosVersion); > + return new NodeProcess(p, id, tcpPort, pumper, isPrimary, > initCommitVersion, initInfosVersion); > + } > + > + private void nodeClosed(int id) { > + NodeProcess oldNode = nodes[id]; > + if (primary != null && oldNode == primary) { > + message("top: " + primary + ": primary process finished"); > + primary = null; > + primaryGen++; > + } else { > + message("top: " + oldNode + ": replica process finished"); > + } > + if (oldNode != null) { > + oldNode.isOpen = false; > + } > + nodes[id] = null; > + nodeTimeStamps[id] = System.nanoTime(); > + > + sendReplicasToPrimary(); > + } > + > + /** Sends currently alive replicas to primary, which uses this to know who > to notify when it does a refresh */ > + private void sendReplicasToPrimary() { > + NodeProcess curPrimary = primary; > + if (curPrimary != null) { > + List<NodeProcess> replicas = new ArrayList<>(); > + for (NodeProcess node : nodes) { > + if (node != null && node.isPrimary == false) { > + replicas.add(node); > + } > + } > + > + message("top: send " + replicas.size() + " replicas to primary"); > + > + try (Connection c = new Connection(curPrimary.tcpPort)) { > + c.out.writeByte(SimplePrimaryNode.CMD_SET_REPLICAS); > + c.out.writeVInt(replicas.size()); > + for(NodeProcess replica : replicas) { > + c.out.writeVInt(replica.id); > + c.out.writeVInt(replica.tcpPort); > + } > + c.flush(); > + c.in.readByte(); > + } catch (Throwable t) { > + message("top: ignore exc sending replicas to primary: " + t); > + } > + } > + } > + > + void addVersionMarker(long version, int count) { > + //System.out.println("ADD VERSION MARKER version=" + version + " count=" > + count); > + if (versionToMarker.containsKey(version)) { > + int curCount = versionToMarker.get(version); > + if (curCount != count) { > + message("top: wrong marker count version=" + version + " count=" + > count + " curCount=" + curCount); > + throw new IllegalStateException("version=" + version + " count=" + > count + " curCount=" + curCount); > + } > + } else { > + message("top: record marker count: version=" + version + " count=" + > count); > + versionToMarker.put(version, count); > + } > + } > + > + void addTransLogLoc(long version, long loc) { > + message("top: record transLogLoc: version=" + version + " loc=" + loc); > + versionToTransLogLocation.put(version, loc); > + } > + > + // Periodically wakes up and starts up any down nodes: > + private class RestartThread extends Thread { > + @Override > + public void run() { > + > + List<Thread> startupThreads = Collections.synchronizedList(new > ArrayList<>()); > + > + try { > + while (stop.get() == false) { > + Thread.sleep(TestUtil.nextInt(random(), 50, 500)); > + message("top: restarter cycle"); > + > + // Randomly crash full cluster: > + if (DO_FULL_CLUSTER_CRASH && random().nextInt(50) == 17) { > + message("top: full cluster crash"); > + for(int i=0;i<nodes.length;i++) { > + if (starting[i]) { > + message("N" + i + ": top: wait for startup so we can > crash..."); > + while (starting[i]) { > + Thread.sleep(10); > + } > + message("N" + i + ": top: done wait for startup"); > + } > + NodeProcess node = nodes[i]; > + if (node != null) { > + crashingNodes.add(i); > + message("top: N" + node.id + ": top: now crash node"); > + node.crash(); > + message("top: N" + node.id + ": top: done crash node"); > + } > + } > + } > + > + List<Integer> downNodes = new ArrayList<>(); > + StringBuilder b = new StringBuilder(); > + long nowNS = System.nanoTime(); > + for(int i=0;i<nodes.length;i++) { > + b.append(' '); > + double sec = (nowNS - nodeTimeStamps[i])/1000000000.0; > + String prefix; > + if (nodes[i] == null) { > + downNodes.add(i); > + if (starting[i]) { > + prefix = "s"; > + } else { > + prefix = "x"; > + } > + } else { > + prefix = ""; > + } > + if (primary != null && nodes[i] == primary) { > + prefix += "p"; > + } > + b.append(String.format(Locale.ROOT, "%s%d(%.1fs)", prefix, i, > sec)); > + } > + message("node status" + b.toString()); > + message("downNodes=" + downNodes); > + > + // If primary is down, promote a replica: > + if (primary == null) { > + if (anyNodesStarting()) { > + message("top: skip promote replica: nodes are still starting"); > + continue; > + } > + promoteReplica(); > + } > + > + // Randomly start up a down a replica: > + > + // Stop or start a replica > + if (downNodes.isEmpty() == false) { > + int idx = downNodes.get(random().nextInt(downNodes.size())); > + if (starting[idx] == false) { > + if (primary == null) { > + if (downNodes.size() == nodes.length) { > + // Cold start: entire cluster is down, start this node up > as the new primary > + message("N" + idx + ": top: cold start as primary"); > + startPrimary(idx); > + } > + } else if (random().nextDouble() < ((double) > downNodes.size())/nodes.length) { > + // Start up replica: > + starting[idx] = true; > + message("N" + idx + ": top: start up: launch thread"); > + Thread t = new Thread() { > + @Override > + public void run() { > + try { > + message("N" + idx + ": top: start up thread"); > + nodes[idx] = startNode(idx, indexPaths[idx], false, > -1); > + sendReplicasToPrimary(); > + } catch (Throwable t) { > + failed.set(true); > + stop.set(true); > + throw new RuntimeException(t); > + } finally { > + starting[idx] = false; > + startupThreads.remove(Thread.currentThread()); > + } > + } > + }; > + t.setName("start R" + idx); > + t.start(); > + startupThreads.add(t); > + } > + } else { > + message("node " + idx + " still starting"); > + } > + } > + } > + > + System.out.println("Restarter: now stop: join " + > startupThreads.size() + " startup threads"); > + > + while (startupThreads.size() > 0) { > + Thread.sleep(10); > + } > + > + } catch (Throwable t) { > + failed.set(true); > + stop.set(true); > + throw new RuntimeException(t); > + } > + } > + } > + > + /** Randomly picks a node and runs a search against it */ > + private class SearchThread extends Thread { > + > + @Override > + public void run() { > + // Maps version to number of hits for silly 'the' TermQuery: > + Query theQuery = new TermQuery(new Term("body", "the")); > + > + // Persists connections > + Map<Integer,Connection> connections = new HashMap<>(); > + > + while (stop.get() == false) { > + NodeProcess node = nodes[random().nextInt(nodes.length)]; > + if (node == null || node.isOpen == false) { > + continue; > + } > + > + if (node.lock.tryLock() == false) { > + // Node is in the process of closing or crashing or something > + continue; > + } > + > + try { > + > + Thread.currentThread().setName("Searcher node=" + node); > + > + //System.out.println("S: cycle; conns=" + connections); > + > + Connection c = connections.get(node.id); > + > + long version; > + try { > + if (c == null) { > + //System.out.println("S: new connection " + node.id + " " + > Thread.currentThread().getName()); > + c = new Connection(node.tcpPort); > + connections.put(node.id, c); > + } else { > + //System.out.println("S: reuse connection " + node.id + " " + > Thread.currentThread().getName()); > + } > + > + c.out.writeByte(SimplePrimaryNode.CMD_SEARCH); > + c.flush(); > + > + while (c.sockIn.available() == 0) { > + if (stop.get()) { > + break; > + } > + if (node.isOpen == false) { > + throw new IOException("node closed"); > + } > + Thread.sleep(1); > + } > + version = c.in.readVLong(); > + > + while (c.sockIn.available() == 0) { > + if (stop.get()) { > + break; > + } > + if (node.isOpen == false) { > + throw new IOException("node closed"); > + } > + Thread.sleep(1); > + } > + int hitCount = c.in.readVInt(); > + > + Integer oldHitCount = hitCounts.get(version); > + > + // TODO: we never prune this map... > + if (oldHitCount == null) { > + hitCounts.put(version, hitCount); > + message("top: searcher: record search hitCount version=" + > version + " hitCount=" + hitCount + " node=" + node); > + } else { > + // Just ensure that all nodes show the same hit count for > + // the same version, i.e. they really are replicas of one > another: > + if (oldHitCount.intValue() != hitCount) { > + failed.set(true); > + stop.set(true); > + message("top: searcher: wrong version hitCount: version=" + > version + " oldHitCount=" + oldHitCount.intValue() + " hitCount=" + hitCount); > + fail("version=" + version + " oldHitCount=" + > oldHitCount.intValue() + " hitCount=" + hitCount); > + } > + } > + } catch (IOException ioe) { > + //message("top: searcher: ignore exc talking to node " + node + > ": " + ioe); > + //ioe.printStackTrace(System.out); > + IOUtils.closeWhileHandlingException(c); > + connections.remove(node.id); > + continue; > + } > + > + // This can be null if we got the new primary after crash and that > primary is still catching up (replaying xlog): > + Integer expectedAtLeastHitCount = versionToMarker.get(version); > + > + if (expectedAtLeastHitCount != null && expectedAtLeastHitCount > 0 > && random().nextInt(10) == 7) { > + try { > + c.out.writeByte(SimplePrimaryNode.CMD_MARKER_SEARCH); > + c.flush(); > + while (c.sockIn.available() == 0) { > + if (stop.get()) { > + break; > + } > + if (node.isOpen == false) { > + throw new IOException("node died"); > + } > + Thread.sleep(1); > + } > + > + version = c.in.readVLong(); > + > + while (c.sockIn.available() == 0) { > + if (stop.get()) { > + break; > + } > + if (node.isOpen == false) { > + throw new IOException("node died"); > + } > + Thread.sleep(1); > + } > + > + int hitCount = c.in.readVInt(); > + > + // Look for data loss: make sure all marker docs are visible: > + > + if (hitCount < expectedAtLeastHitCount) { > + > + String failMessage = "node=" + node + ": documents were lost > version=" + version + " hitCount=" + hitCount + " vs > expectedAtLeastHitCount=" + expectedAtLeastHitCount; > + message(failMessage); > + failed.set(true); > + stop.set(true); > + fail(failMessage); > + } > + } catch (IOException ioe) { > + //message("top: searcher: ignore exc talking to node " + node > + ": " + ioe); > + //throw new RuntimeException(ioe); > + //ioe.printStackTrace(System.out); > + IOUtils.closeWhileHandlingException(c); > + connections.remove(node.id); > + continue; > + } > + } > + > + Thread.sleep(10); > + > + } catch (Throwable t) { > + failed.set(true); > + stop.set(true); > + throw new RuntimeException(t); > + } finally { > + node.lock.unlock(); > + } > + } > + System.out.println("Searcher: now stop"); > + IOUtils.closeWhileHandlingException(connections.values()); > + } > + } > + > + private class IndexThread extends Thread { > + > + @Override > + public void run() { > + > + try { > + LineFileDocs docs = new LineFileDocs(random()); > + int docCount = 0; > + > + // How often we do an update/delete vs add: > + double updatePct = random().nextDouble(); > + > + // Varies how many docs/sec we index: > + int sleepChance = TestUtil.nextInt(random(), 4, 100); > + > + message("top: indexer: updatePct=" + updatePct + " sleepChance=" + > sleepChance); > + > + long lastTransLogLoc = transLog.getNextLocation(); > + > + NodeProcess curPrimary = null; > + Connection c = null; > + > + while (stop.get() == false) { > + > + try { > + while (stop.get() == false && curPrimary == null) { > + Thread.sleep(10); > + curPrimary = primary; > + if (curPrimary != null) { > + c = new Connection(curPrimary.tcpPort); > + c.out.writeByte(SimplePrimaryNode.CMD_INDEXING); > + break; > + } > + } > + > + if (stop.get()) { > + break; > + } > + > + Thread.currentThread().setName("indexer p" + curPrimary.id); > + > + if (random().nextInt(10) == 7) { > + // We use the marker docs to check for data loss in search > thread: > + Document doc = new Document(); > + int id = markerUpto.getAndIncrement(); > + String idString = "m"+id; > + doc.add(newStringField("docid", idString, Field.Store.YES)); > + doc.add(newStringField("marker", "marker", Field.Store.YES)); > + curPrimary.addOrUpdateDocument(c, doc, false); > + transLog.addDocument(idString, doc); > + message("index marker=" + idString + "; translog is " + > Node.bytesToString(Files.size(transLogPath))); > + } > + > + if (docCount > 0 && random().nextDouble() < updatePct) { > + int randomID = random().nextInt(docCount); > + String randomIDString = Integer.toString(randomID); > + if (random().nextBoolean()) { > + // Replace previous doc > + Document doc = docs.nextDoc(); > + ((Field) > doc.getField("docid")).setStringValue(randomIDString); > + curPrimary.addOrUpdateDocument(c, doc, true); > + transLog.updateDocument(randomIDString, doc); > + } else { > + // Delete previous doc > + curPrimary.deleteDocument(c, randomIDString); > + transLog.deleteDocuments(randomIDString); > + } > + } else { > + // Add new doc: > + Document doc = docs.nextDoc(); > + String idString = Integer.toString(docCount++); > + ((Field) doc.getField("docid")).setStringValue(idString); > + curPrimary.addOrUpdateDocument(c, doc, false); > + transLog.addDocument(idString, doc); > + > + if (DO_RANDOM_XLOG_REPLAY && random().nextInt(10) == 7) { > + long curLoc = transLog.getNextLocation(); > + // randomly replay chunks of translog just to test replay: > + message("now randomly replay translog from " + > lastTransLogLoc + " to " + curLoc); > + transLog.replay(curPrimary, lastTransLogLoc, curLoc); > + lastTransLogLoc = curLoc; > + } > + } > + } catch (IOException se) { > + // Assume primary crashed > + message("top: indexer lost connection to primary"); > + try { > + c.close(); > + } catch (Throwable t) { > + } > + curPrimary = null; > + c = null; > + } > + > + if (random().nextInt(sleepChance) == 0) { > + Thread.sleep(1); > + } > + > + if (random().nextInt(100) == 17) { > + System.out.println("Indexer: now pause for a bit..."); > + Thread.sleep(TestUtil.nextInt(random(), 500, 2000)); > + System.out.println("Indexer: done pause for a bit..."); > + } > + } > + if (curPrimary != null) { > + try { > + c.out.writeByte(SimplePrimaryNode.CMD_INDEXING_DONE); > + c.flush(); > + c.in.readByte(); > + } catch (IOException se) { > + // Assume primary crashed > + message("top: indexer lost connection to primary"); > + try { > + c.close(); > + } catch (Throwable t) { > + } > + curPrimary = null; > + c = null; > + } > + } > + System.out.println("Indexer: now stop"); > + } catch (Throwable t) { > + failed.set(true); > + stop.set(true); > + throw new RuntimeException(t); > + } > + } > + } > + > + static void message(String message) { > + long now = System.nanoTime(); > + System.out.println(String.format(Locale.ROOT, > + "%5.3fs : parent [%11s] %s", > + (now-Node.globalStartNS)/1000000000., > + Thread.currentThread().getName(), > + message)); > + } > +} > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java > ---------------------------------------------------------------------- > diff --git > a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java > > b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java > new file mode 100644 > index 0000000..6ddb777 > --- /dev/null > +++ > b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/ThreadPumper.java > @@ -0,0 +1,59 @@ > +package org.apache.lucene.replicator.nrt; > + > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +import java.io.BufferedReader; > +import java.io.IOException; > +import java.io.PrintStream; > +import java.io.Writer; > +import java.util.regex.Pattern; > + > +/** A pipe thread. It'd be nice to reuse guava's implementation for this... > */ > +class ThreadPumper { > + public static Thread start(final Runnable onExit, final BufferedReader > from, final PrintStream to, final Writer toFile) { > + Thread t = new Thread() { > + @Override > + public void run() { > + try { > + Pattern logTimeStart = Pattern.compile("^[0-9\\.]+s .*"); > + String line; > + while ((line = from.readLine()) != null) { > + if (toFile != null) { > + toFile.write(line); > + toFile.write("\n"); > + toFile.flush(); > + } else if (logTimeStart.matcher(line).matches()) { > + // Already a well-formed log output: > + System.out.println(line); > + } else { > + TestNRTReplication.message(line); > + } > + } > + // Sub-process finished > + } catch (IOException e) { > + System.err.println("ignore IOExc reading from forked process > pipe: " + e); > + } finally { > + onExit.run(); > + } > + } > + }; > + t.start(); > + return t; > + } > +} > + > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/replicator/test.cmd > ---------------------------------------------------------------------- > diff --git a/lucene/replicator/test.cmd b/lucene/replicator/test.cmd > new file mode 100644 > index 0000000..14e3bd2 > --- /dev/null > +++ b/lucene/replicator/test.cmd > @@ -0,0 +1 @@ > +python -u /l/util/src/python/repeatLuceneTest.py -tmpDir /b/tmp -logDir > /l/logs TestNRTReplication -jvms 1 -mult 4 -nightly > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/sandbox/src/test/org/apache/lucene/util/BaseGeoPointTestCase.java > ---------------------------------------------------------------------- > diff --git > a/lucene/sandbox/src/test/org/apache/lucene/util/BaseGeoPointTestCase.java > b/lucene/sandbox/src/test/org/apache/lucene/util/BaseGeoPointTestCase.java > index 4236e88..9f876ef 100644 > --- a/lucene/sandbox/src/test/org/apache/lucene/util/BaseGeoPointTestCase.java > +++ b/lucene/sandbox/src/test/org/apache/lucene/util/BaseGeoPointTestCase.java > @@ -637,7 +637,7 @@ public abstract class BaseGeoPointTestCase extends > LuceneTestCase { > for (int iter=0;iter<iters && failed.get() == false;iter++) { > > if (VERBOSE) { > - System.out.println("\nTEST: iter=" + iter + " s=" + s); > + System.out.println("\n" + Thread.currentThread().getName() + > ": TEST: iter=" + iter + " s=" + s); > } > Query query; > VerifyHits verifyHits; > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/test-framework/src/java/org/apache/lucene/index/BaseIndexFileFormatTestCase.java > ---------------------------------------------------------------------- > diff --git > a/lucene/test-framework/src/java/org/apache/lucene/index/BaseIndexFileFormatTestCase.java > > b/lucene/test-framework/src/java/org/apache/lucene/index/BaseIndexFileFormatTestCase.java > index b4b6f7d..68eed39 100644 > --- > a/lucene/test-framework/src/java/org/apache/lucene/index/BaseIndexFileFormatTestCase.java > +++ > b/lucene/test-framework/src/java/org/apache/lucene/index/BaseIndexFileFormatTestCase.java > @@ -457,7 +457,7 @@ abstract class BaseIndexFileFormatTestCase extends > LuceneTestCase { > if (random().nextBoolean()) { > DirectoryReader ir = null; > try { > - ir = DirectoryReader.open(iw, random().nextBoolean()); > + ir = DirectoryReader.open(iw, random().nextBoolean(), false); > dir.setRandomIOExceptionRateOnOpen(0.0); // disable > exceptions on openInput until next iteration > TestUtil.checkReader(ir); > } finally { > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java > ---------------------------------------------------------------------- > diff --git > a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java > > b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java > index 047ef4b..22fee48 100644 > --- > a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java > +++ > b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java > @@ -303,7 +303,7 @@ public class RandomIndexWriter implements Closeable { > > public DirectoryReader getReader() throws IOException { > LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); > - return getReader(true); > + return getReader(true, false); > } > > private boolean doRandomForceMerge = true; > @@ -353,7 +353,7 @@ public class RandomIndexWriter implements Closeable { > } > } > > - public DirectoryReader getReader(boolean applyDeletions) throws > IOException { > + public DirectoryReader getReader(boolean applyDeletions, boolean > writeAllDeletes) throws IOException { > LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig()); > getReaderCalled = true; > if (r.nextInt(20) == 2) { > @@ -366,7 +366,7 @@ public class RandomIndexWriter implements Closeable { > if (r.nextInt(5) == 1) { > w.commit(); > } > - return w.getReader(applyDeletions); > + return w.getReader(applyDeletions, writeAllDeletes); > } else { > if (LuceneTestCase.VERBOSE) { > System.out.println("RIW.getReader: open new reader"); > @@ -375,7 +375,7 @@ public class RandomIndexWriter implements Closeable { > if (r.nextBoolean()) { > return DirectoryReader.open(w.getDirectory()); > } else { > - return w.getReader(applyDeletions); > + return w.getReader(applyDeletions, writeAllDeletes); > } > } > } > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java > ---------------------------------------------------------------------- > diff --git > a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java > > b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java > index ec99c7e..b19045b 100644 > --- > a/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java > +++ > b/lucene/test-framework/src/java/org/apache/lucene/store/MockDirectoryWrapper.java > @@ -38,8 +38,10 @@ import java.util.TreeSet; > import java.util.concurrent.ConcurrentHashMap; > import java.util.concurrent.ConcurrentMap; > import java.util.concurrent.atomic.AtomicInteger; > +import java.util.regex.Matcher; > > import org.apache.lucene.index.DirectoryReader; > +import org.apache.lucene.index.IndexFileNames; > import org.apache.lucene.index.IndexWriter; > import org.apache.lucene.index.IndexWriterConfig; > import org.apache.lucene.index.NoDeletionPolicy; > @@ -239,12 +241,24 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > > if (openFiles.containsKey(source)) { > if (assertNoDeleteOpenFile) { > - throw (AssertionError) fillOpenTrace(new > AssertionError("MockDirectoryWrapper: file \"" + source + "\" is still open: > cannot rename"), source, true); > + throw (AssertionError) fillOpenTrace(new > AssertionError("MockDirectoryWrapper: source file \"" + source + "\" is still > open: cannot rename"), source, true); > } else if (noDeleteOpenFile) { > - throw (IOException) fillOpenTrace(new > IOException("MockDirectoryWrapper: file \"" + source + "\" is still open: > cannot rename"), source, true); > + throw (IOException) fillOpenTrace(new > IOException("MockDirectoryWrapper: source file \"" + source + "\" is still > open: cannot rename"), source, true); > } > } > > + if (openFiles.containsKey(dest)) { > + if (assertNoDeleteOpenFile) { > + throw (AssertionError) fillOpenTrace(new > AssertionError("MockDirectoryWrapper: dest file \"" + dest + "\" is still > open: cannot rename"), dest, true); > + } else if (noDeleteOpenFile) { > + throw (IOException) fillOpenTrace(new > IOException("MockDirectoryWrapper: dest file \"" + dest + "\" is still open: > cannot rename"), dest, true); > + } > + } > + > + if (createdFiles.contains(dest)) { > + throw new IOException("MockDirectoryWrapper: dest file \"" + dest + > "\" already exists: cannot rename"); > + } > + > boolean success = false; > try { > in.renameFile(source, dest); > @@ -257,6 +271,8 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > unSyncedFiles.add(dest); > } > openFilesDeleted.remove(source); > + triedToDelete.remove(dest); > + createdFiles.add(dest); > } > } > } > @@ -278,89 +294,215 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > } > } > > - /** Simulates a crash of OS or machine by overwriting > - * unsynced files. */ > - public synchronized void crash() throws IOException { > - crashed = true; > - openFiles = new HashMap<>(); > - openFilesForWrite = new HashSet<>(); > - openFilesDeleted = new HashSet<>(); > - Iterator<String> it = unSyncedFiles.iterator(); > - unSyncedFiles = new HashSet<>(); > - // first force-close all files, so we can corrupt on windows etc. > - // clone the file map, as these guys want to remove themselves on close. > - Map<Closeable,Exception> m = new IdentityHashMap<>(openFileHandles); > - for (Closeable f : m.keySet()) { > - try { > - f.close(); > - } catch (Exception ignored) {} > + public synchronized void corruptUnknownFiles() throws IOException { > + > + System.out.println("MDW: corrupt unknown files"); > + Set<String> knownFiles = new HashSet<>(); > + for(String fileName : listAll()) { > + if (fileName.startsWith(IndexFileNames.SEGMENTS)) { > + System.out.println("MDW: read " + fileName + " to gather files it > references"); > + knownFiles.addAll(SegmentInfos.readCommit(this, > fileName).files(true)); > + } > } > - > - while(it.hasNext()) { > - String name = it.next(); > - int damage = randomState.nextInt(5); > + > + Set<String> toCorrupt = new HashSet<>(); > + Matcher m = IndexFileNames.CODEC_FILE_PATTERN.matcher(""); > + for(String fileName : listAll()) { > + m.reset(fileName); > + if (knownFiles.contains(fileName) == false && > + fileName.endsWith("write.lock") == false && > + (m.matches() || > fileName.startsWith(IndexFileNames.PENDING_SEGMENTS))) { > + toCorrupt.add(fileName); > + } > + } > + > + corruptFiles(toCorrupt); > + } > + > + public synchronized void corruptFiles(Collection<String> files) { > + // Must make a copy because we change the incoming unsyncedFiles > + // when we create temp files, delete, etc., below: > + for(String name : new ArrayList<>(files)) { > + int damage = randomState.nextInt(6); > String action = null; > > - if (damage == 0) { > + switch(damage) { > + > + case 0: > action = "deleted"; > - deleteFile(name, true); > - } else if (damage == 1) { > + try { > + deleteFile(name, true); > + } catch (IOException ioe) { > + // ignore > + } > + break; > + > + case 1: > action = "zeroed"; > // Zero out file entirely > - long length = fileLength(name); > + long length; > + try { > + length = fileLength(name); > + } catch (IOException ioe) { > + // Ignore > + continue; > + } > byte[] zeroes = new byte[256]; > long upto = 0; > - IndexOutput out = in.createOutput(name, > LuceneTestCase.newIOContext(randomState)); > - while(upto < length) { > - final int limit = (int) Math.min(length-upto, zeroes.length); > - out.writeBytes(zeroes, 0, limit); > - upto += limit; > + try (IndexOutput out = in.createOutput(name, > LuceneTestCase.newIOContext(randomState))) { > + while(upto < length) { > + final int limit = (int) Math.min(length-upto, zeroes.length); > + out.writeBytes(zeroes, 0, limit); > + upto += limit; > + } > + } catch (IOException ioe) { > + // ignore > } > - out.close(); > - } else if (damage == 2) { > - action = "partially truncated"; > - // Partially Truncate the file: > - > - // First, make temp file and copy only half this > - // file over: > - String tempFileName; > - while (true) { > - tempFileName = ""+randomState.nextInt(); > - if (!LuceneTestCase.slowFileExists(in, tempFileName)) { > - break; > + break; > + > + case 2: > + { > + action = "partially truncated"; > + // Partially Truncate the file: > + > + // First, make temp file and copy only half this > + // file over: > + String tempFileName = null; > + try (IndexOutput tempOut = in.createTempOutput("name", > "mdw_corrupt", LuceneTestCase.newIOContext(randomState)); > + IndexInput ii = in.openInput(name, > LuceneTestCase.newIOContext(randomState))) { > + tempFileName = tempOut.getName(); > + tempOut.copyBytes(ii, ii.length()/2); > + } catch (IOException ioe) { > + // ignore > + } > + > + try { > + // Delete original and copy bytes back: > + deleteFile(name, true); > + } catch (IOException ioe) { > + // ignore > + } > + > + try (IndexOutput out = in.createOutput(name, > LuceneTestCase.newIOContext(randomState)); > + IndexInput ii = in.openInput(tempFileName, > LuceneTestCase.newIOContext(randomState))) { > + out.copyBytes(ii, ii.length()); > + } catch (IOException ioe) { > + // ignore > + } > + try { > + deleteFile(tempFileName, true); > + } catch (IOException ioe) { > + // ignore > } > } > - final IndexOutput tempOut = in.createOutput(tempFileName, > LuceneTestCase.newIOContext(randomState)); > - IndexInput ii = in.openInput(name, > LuceneTestCase.newIOContext(randomState)); > - tempOut.copyBytes(ii, ii.length()/2); > - tempOut.close(); > - ii.close(); > - > - // Delete original and copy bytes back: > - deleteFile(name, true); > - > - final IndexOutput out = in.createOutput(name, > LuceneTestCase.newIOContext(randomState)); > - ii = in.openInput(tempFileName, > LuceneTestCase.newIOContext(randomState)); > - out.copyBytes(ii, ii.length()); > - out.close(); > - ii.close(); > - deleteFile(tempFileName, true); > - } else if (damage == 3) { > + break; > + > + case 3: > // The file survived intact: > action = "didn't change"; > - } else { > + break; > + > + case 4: > + // Corrupt one bit randomly in the file: > + > + { > + > + String tempFileName = null; > + try (IndexOutput tempOut = in.createTempOutput("name", > "mdw_corrupt", LuceneTestCase.newIOContext(randomState)); > + IndexInput ii = in.openInput(name, > LuceneTestCase.newIOContext(randomState))) { > + tempFileName = tempOut.getName(); > + if (ii.length() > 0) { > + // Copy first part unchanged: > + long byteToCorrupt = (long) (randomState.nextDouble() * > ii.length()); > + if (byteToCorrupt > 0) { > + tempOut.copyBytes(ii, byteToCorrupt); > + } > + > + // Randomly flip one bit from this byte: > + byte b = ii.readByte(); > + int bitToFlip = randomState.nextInt(8); > + b = (byte) (b ^ (1 << bitToFlip)); > + tempOut.writeByte(b); > + > + action = "flip bit " + bitToFlip + " of byte " + > byteToCorrupt + " out of " + ii.length() + " bytes"; > + > + // Copy last part unchanged: > + long bytesLeft = ii.length() - byteToCorrupt - 1; > + if (bytesLeft > 0) { > + tempOut.copyBytes(ii, bytesLeft); > + } > + } else { > + action = "didn't change"; > + } > + } catch (IOException ioe) { > + // ignore > + } > + > + try { > + // Delete original and copy bytes back: > + deleteFile(name, true); > + } catch (IOException ioe) { > + // ignore > + } > + > + try (IndexOutput out = in.createOutput(name, > LuceneTestCase.newIOContext(randomState)); > + IndexInput ii = in.openInput(tempFileName, > LuceneTestCase.newIOContext(randomState))) { > + out.copyBytes(ii, ii.length()); > + } catch (IOException ioe) { > + // ignore > + } > + try { > + deleteFile(tempFileName, true); > + } catch (IOException ioe) { > + // ignore > + } > + } > + break; > + > + case 5: > action = "fully truncated"; > // Totally truncate the file to zero bytes > - deleteFile(name, true); > - IndexOutput out = in.createOutput(name, > LuceneTestCase.newIOContext(randomState)); > - out.close(); > + try { > + deleteFile(name, true); > + } catch (IOException ioe) { > + // ignore > + } > + > + try (IndexOutput out = in.createOutput(name, > LuceneTestCase.newIOContext(randomState))) { > + } catch (IOException ioe) { > + // ignore > + } > + break; > + > + default: > + throw new AssertionError(); > } > - if (LuceneTestCase.VERBOSE) { > + > + if (true || LuceneTestCase.VERBOSE) { > System.out.println("MockDirectoryWrapper: " + action + " unsynced > file: " + name); > } > } > } > > + /** Simulates a crash of OS or machine by overwriting > + * unsynced files. */ > + public synchronized void crash() { > + crashed = true; > + openFiles = new HashMap<>(); > + openFilesForWrite = new HashSet<>(); > + openFilesDeleted = new HashSet<>(); > + // first force-close all files, so we can corrupt on windows etc. > + // clone the file map, as these guys want to remove themselves on close. > + Map<Closeable,Exception> m = new IdentityHashMap<>(openFileHandles); > + for (Closeable f : m.keySet()) { > + try { > + f.close(); > + } catch (Exception ignored) {} > + } > + corruptFiles(unSyncedFiles); > + unSyncedFiles = new HashSet<>(); > + } > + > public synchronized void clearCrash() { > crashed = false; > openLocks.clear(); > @@ -520,9 +662,9 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > if (!forced && enableVirusScanner && (randomState.nextInt(4) == 0)) { > triedToDelete.add(name); > if (LuceneTestCase.VERBOSE) { > - System.out.println("MDW: now refuse to delete file: " + name); > + System.out.println(Thread.currentThread().getName() + ": MDW: now > refuse to delete file: " + name + " this=" + this); > } > - throw new IOException("cannot delete file: " + name + ", a virus > scanner has it open"); > + throw new IOException("cannot delete file: " + name + ", a virus > scanner has it open (exists?=" + LuceneTestCase.slowFileExists(in, name)); > } > triedToDelete.remove(name); > in.deleteFile(name); > @@ -571,6 +713,7 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > > unSyncedFiles.add(name); > createdFiles.add(name); > + triedToDelete.remove(name); > > if (in instanceof RAMDirectory) { > RAMDirectory ramdir = (RAMDirectory) in; > @@ -801,7 +944,11 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > IndexWriterConfig iwc = new IndexWriterConfig(null); > iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); > new IndexWriter(in, iwc).rollback(); > - String[] endFiles = in.listAll(); > + > + Set<String> files = new HashSet<>(Arrays.asList(listAll())); > + // Disregard what happens with the pendingDeletions files: > + files.removeAll(pendingDeletions); > + String[] endFiles = files.toArray(new String[0]); > > Set<String> startSet = new TreeSet<>(Arrays.asList(startFiles)); > Set<String> endSet = new TreeSet<>(Arrays.asList(endFiles)); > @@ -839,7 +986,7 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > assert pendingDeletions.contains(s); > if (LuceneTestCase.VERBOSE) { > System.out.println("MDW: Unreferenced check: Ignoring > referenced file: " + s + " " + > - "from " + file + " that we could not delete."); > + "from " + file + " that we could > not delete."); > } > startSet.add(s); > } > @@ -884,7 +1031,7 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > extras += "\n\nThese files we had previously tried to delete, > but couldn't: " + pendingDeletions; > } > > - throw new RuntimeException("unreferenced files: before > delete:\n " + Arrays.toString(startFiles) + "\n after delete:\n " + > Arrays.toString(endFiles) + extras); > + throw new RuntimeException(this + ": unreferenced files: > before delete:\n " + Arrays.toString(startFiles) + "\n after delete:\n > " + Arrays.toString(endFiles) + extras); > } > > DirectoryReader ir1 = DirectoryReader.open(this); > @@ -1036,7 +1183,6 @@ public class MockDirectoryWrapper extends > BaseDirectoryWrapper { > } > } > > - > // don't override optional methods like copyFrom: we need the default impl > for things like disk > // full checks. we randomly exercise "raw" directories anyway. We ensure > default impls are used: > > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java > ---------------------------------------------------------------------- > diff --git > a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java > b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java > index aaab030..e6536f3 100644 > --- > a/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java > +++ > b/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java > @@ -1155,11 +1155,14 @@ public abstract class LuceneTestCase extends Assert { > } > > if (rarely(r)) { > - // change warmer parameters > - if (r.nextBoolean()) { > - c.setMergedSegmentWarmer(new > SimpleMergedSegmentWarmer(c.getInfoStream())); > - } else { > - c.setMergedSegmentWarmer(null); > + IndexWriter.IndexReaderWarmer curWarmer = c.getMergedSegmentWarmer(); > + if (curWarmer == null || curWarmer instanceof > SimpleMergedSegmentWarmer) { > + // change warmer parameters > + if (r.nextBoolean()) { > + c.setMergedSegmentWarmer(new > SimpleMergedSegmentWarmer(c.getInfoStream())); > + } else { > + c.setMergedSegmentWarmer(null); > + } > } > didChange = true; > } > > http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1ae72914/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java > ---------------------------------------------------------------------- > diff --git > a/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java > b/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java > index 99d4be3..de2cf57 100644 > --- a/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java > +++ b/lucene/test-framework/src/java/org/apache/lucene/util/TestUtil.java > @@ -976,15 +976,14 @@ public final class TestUtil { > public static void reduceOpenFiles(IndexWriter w) { > // keep number of open files lowish > MergePolicy mp = w.getConfig().getMergePolicy(); > + mp.setNoCFSRatio(1.0); > if (mp instanceof LogMergePolicy) { > LogMergePolicy lmp = (LogMergePolicy) mp; > lmp.setMergeFactor(Math.min(5, lmp.getMergeFactor())); > - lmp.setNoCFSRatio(1.0); > } else if (mp instanceof TieredMergePolicy) { > TieredMergePolicy tmp = (TieredMergePolicy) mp; > tmp.setMaxMergeAtOnce(Math.min(5, tmp.getMaxMergeAtOnce())); > tmp.setSegmentsPerTier(Math.min(5, tmp.getSegmentsPerTier())); > - tmp.setNoCFSRatio(1.0); > } > MergeScheduler ms = w.getConfig().getMergeScheduler(); > if (ms instanceof ConcurrentMergeScheduler) { > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
