thomasmueller commented on a change in pull request #517:
URL: https://github.com/apache/jackrabbit-oak/pull/517#discussion_r828907630
##########
File path:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
##########
@@ -69,6 +71,9 @@
private final IndexEditorProvider indexEditorProvider;
private final AsyncIndexerLock indexerLock;
private final IndexDefinitionUpdater indexDefinitionUpdater;
+ static final int retries =
Integer.parseInt(System.getProperty("oak.index.import.retries", "5"));
Review comment:
should be RETRIES
##########
File path:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
##########
@@ -58,7 +60,7 @@
*/
static final String ASYNC_LANE_SYNC = "sync";
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Logger log =
LoggerFactory.getLogger(IndexImporter.class);
Review comment:
Nit: if it's static, then in theory (according to the coding rules),
"log" should be "LOG". As you anyway have to change the other constants, I
recommend to change this as well.
##########
File path:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
##########
@@ -84,31 +89,74 @@ public IndexImporter(NodeStore nodeStore, File indexDir,
IndexEditorProvider ind
"checkpointed state [%s]", indexerInfo.checkpoint);
this.indexDefinitionUpdater = new IndexDefinitionUpdater(new
File(indexDir, INDEX_DEFINITIONS_JSON));
this.asyncLaneToIndexMapping = mapIndexesToLanes(indexes);
+ this.indexPathsToUpdate = new HashSet<>();
+ }
+
+ enum IndexImportState {
+ SWITCHLANE, IMPORT_INDEX_DATA, BRING_INDEX_UPTODATE, RELEASE_CHECKPOINT
}
public void importIndex() throws IOException, CommitFailedException {
- if (indexes.keySet().isEmpty()) {
- log.warn("No indexes to import (possibly index definitions outside
of a oak:index node?)");
+ try {
+ if (indexes.keySet().isEmpty()) {
+ log.warn("No indexes to import (possibly index definitions
outside of a oak:index node?)");
+ }
+ log.info("Proceeding to import {} indexes from {}",
indexes.keySet(), indexDir.getAbsolutePath());
+
+ //TODO Need to review it for idempotent design. A failure in any
step should not
+ //leave setup in in consistent state and provide option for
recovery
+
+ //Step 1 - Switch the index lanes so that async indexer does not
touch them
+ //while we are importing the index data
+ runWithRetry(retries, IndexImportState.SWITCHLANE, () ->
switchLanes());
+ log.info("Done with switching of index lanes before import");
+
+ //Step 2 - Import the existing index data.
+ // In this step we are:
+ // switching lane for new index
+ // incrementing reindex count.
+ // marking index as disabled in case of superseded index
+ // after this step new index is available in repository
+ runWithRetry(retries, IndexImportState.IMPORT_INDEX_DATA, () ->
importIndexData());
+ log.info("Done with importing of index data");
+
+ //Step 3 - Bring index upto date.
+ // In this step we are:
+ // interrupting current indexing.
+ // reverting lane back to async
+ // resuming current indexing;
+ runWithRetry(retries, IndexImportState.BRING_INDEX_UPTODATE, () ->
bringIndexUpToDate());
+ log.info("Done with bringing index up-to-date");
+ //Step 4 - Release the checkpoint
+ // this is again an idempotent function
+ runWithRetry(retries, IndexImportState.RELEASE_CHECKPOINT, () ->
releaseCheckpoint());
+ log.info("Done with releasing checkpoint");
+
+ // Remove indexImportState property on successful import
+ updateIndexImporterState(null, true);
+ log.info("Done with removing index import state");
+
+ } catch (CommitFailedException | IOException e){
+ try{
Review comment:
Nit: a space after "try" is missing.
##########
File path:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
##########
@@ -69,6 +71,9 @@
private final IndexEditorProvider indexEditorProvider;
private final AsyncIndexerLock indexerLock;
private final IndexDefinitionUpdater indexDefinitionUpdater;
+ static final int retries =
Integer.parseInt(System.getProperty("oak.index.import.retries", "5"));
+ public static final String indexImportStateKey = "indexImportState";
Review comment:
but here, I think capitalizing it is the right way:
INDEX_IMPORT_STATE_KEY
##########
File path:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
##########
@@ -327,4 +408,25 @@ public String toString() {
}
}
+ interface IndexImporterStepExecutor {
+ void execute() throws CommitFailedException, IOException;
+ }
+
+ void runWithRetry(int maxRetries, IndexImportState indexImportState,
IndexImporterStepExecutor step) throws CommitFailedException, IOException {
+ int count = 0;
+ while (count < maxRetries) {
+ log.info("IndexImporterStepExecutor:{} ,count:{}",
indexImportState, count);
+ try {
+ step.execute();
+ break;
+ } catch (CommitFailedException | IOException e) {
+ log.info("IndexImporterStepExecutor:{} fail count: {}",
indexImportState, count);
Review comment:
I would log.warn here, and also log the exception stack trace (that is,
append ", e").
##########
File path:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
##########
@@ -84,31 +89,74 @@ public IndexImporter(NodeStore nodeStore, File indexDir,
IndexEditorProvider ind
"checkpointed state [%s]", indexerInfo.checkpoint);
this.indexDefinitionUpdater = new IndexDefinitionUpdater(new
File(indexDir, INDEX_DEFINITIONS_JSON));
this.asyncLaneToIndexMapping = mapIndexesToLanes(indexes);
+ this.indexPathsToUpdate = new HashSet<>();
+ }
+
+ enum IndexImportState {
+ SWITCHLANE, IMPORT_INDEX_DATA, BRING_INDEX_UPTODATE, RELEASE_CHECKPOINT
}
public void importIndex() throws IOException, CommitFailedException {
- if (indexes.keySet().isEmpty()) {
- log.warn("No indexes to import (possibly index definitions outside
of a oak:index node?)");
+ try {
+ if (indexes.keySet().isEmpty()) {
+ log.warn("No indexes to import (possibly index definitions
outside of a oak:index node?)");
+ }
+ log.info("Proceeding to import {} indexes from {}",
indexes.keySet(), indexDir.getAbsolutePath());
+
+ //TODO Need to review it for idempotent design. A failure in any
step should not
+ //leave setup in in consistent state and provide option for
recovery
+
+ //Step 1 - Switch the index lanes so that async indexer does not
touch them
+ //while we are importing the index data
+ runWithRetry(retries, IndexImportState.SWITCHLANE, () ->
switchLanes());
+ log.info("Done with switching of index lanes before import");
+
+ //Step 2 - Import the existing index data.
+ // In this step we are:
+ // switching lane for new index
+ // incrementing reindex count.
+ // marking index as disabled in case of superseded index
+ // after this step new index is available in repository
+ runWithRetry(retries, IndexImportState.IMPORT_INDEX_DATA, () ->
importIndexData());
+ log.info("Done with importing of index data");
+
+ //Step 3 - Bring index upto date.
+ // In this step we are:
+ // interrupting current indexing.
+ // reverting lane back to async
+ // resuming current indexing;
+ runWithRetry(retries, IndexImportState.BRING_INDEX_UPTODATE, () ->
bringIndexUpToDate());
+ log.info("Done with bringing index up-to-date");
+ //Step 4 - Release the checkpoint
+ // this is again an idempotent function
+ runWithRetry(retries, IndexImportState.RELEASE_CHECKPOINT, () ->
releaseCheckpoint());
+ log.info("Done with releasing checkpoint");
+
+ // Remove indexImportState property on successful import
+ updateIndexImporterState(null, true);
+ log.info("Done with removing index import state");
+
+ } catch (CommitFailedException | IOException e){
+ try{
Review comment:
I think the first thing we should do here is log the error.
##########
File path:
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
##########
@@ -538,4 +557,153 @@ private void dumpIndexDefinitions(String... indexPaths)
throws IOException, Comm
printer.print(pw, Format.JSON, false);
Files.write(sw.toString(), file, UTF_8);
}
+
+ private String importData_IncrementalUpdate_Before_Setup_Method() throws
IOException, CommitFailedException {
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "fooIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("a").setProperty("foo", "abc");
+ builder.child("b").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ new AsyncIndexUpdate("async", store, provider).run();
+
+ String checkpoint = createIndexDirs("/oak:index/fooIndex");
+ builder = store.getRoot().builder();
+ builder.child("c").setProperty("foo", "abc");
+ builder.child("d").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ new AsyncIndexUpdate("async", store, provider).run();
+
+ FilterImpl f = createFilter(store.getRoot(), NT_BASE);
+ PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
+ assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));
+
+ builder = store.getRoot().builder();
+ builder.child("e").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ return checkpoint;
+ }
+
+ private IndexImporterProvider getImporterProvider(String checkpoint) {
+ IndexImporterProvider importerProvider = new IndexImporterProvider() {
+ @Override
+ public void importIndex(NodeState root, NodeBuilder defn, File
indexDir) {
+ assertEquals("fooIndex", indexDir.getName());
+ assertEquals(2,
defn.getProperty(REINDEX_COUNT).getValue(Type.LONG).longValue());
+
defn.getChildNode(IndexConstants.INDEX_CONTENT_NODE_NAME).remove();
+
+ NodeState cpState = store.retrieve(checkpoint);
+ NodeState indexData = NodeStateUtils.getNode(cpState,
"/oak:index/fooIndex/:index");
+ defn.setChildNode(IndexConstants.INDEX_CONTENT_NODE_NAME,
indexData);
+ }
+
+ @Override
+ public String getType() {
+ return "property";
+ }
+ };
+ return importerProvider;
+ }
+
+ @Test
+ public void
importData_IncrementalUpdate_Test_Failure_AT_SWITCHLANE_State() throws
Exception {
Review comment:
This is a mix of camel case, underscore, caps...
importDataIncrementalUpdateTestFailureAtSwitchLaneState
##########
File path:
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
##########
@@ -538,4 +557,153 @@ private void dumpIndexDefinitions(String... indexPaths)
throws IOException, Comm
printer.print(pw, Format.JSON, false);
Files.write(sw.toString(), file, UTF_8);
}
+
+ private String importData_IncrementalUpdate_Before_Setup_Method() throws
IOException, CommitFailedException {
Review comment:
That should be importDataIncrementalUpdateBeforeSetup right?
##########
File path:
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
##########
@@ -327,4 +408,25 @@ public String toString() {
}
}
+ interface IndexImporterStepExecutor {
+ void execute() throws CommitFailedException, IOException;
+ }
+
+ void runWithRetry(int maxRetries, IndexImportState indexImportState,
IndexImporterStepExecutor step) throws CommitFailedException, IOException {
+ int count = 0;
+ while (count < maxRetries) {
+ log.info("IndexImporterStepExecutor:{} ,count:{}",
indexImportState, count);
+ try {
+ step.execute();
+ break;
+ } catch (CommitFailedException | IOException e) {
+ log.info("IndexImporterStepExecutor:{} fail count: {}",
indexImportState, count);
+ if (++count >= maxRetries) {
+ log.info("IndexImporterStepExecutor:{} failed after {}
retries", indexImportState, maxRetries);
Review comment:
H would use log.warn here.
##########
File path:
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
##########
@@ -538,4 +557,153 @@ private void dumpIndexDefinitions(String... indexPaths)
throws IOException, Comm
printer.print(pw, Format.JSON, false);
Files.write(sw.toString(), file, UTF_8);
}
+
+ private String importData_IncrementalUpdate_Before_Setup_Method() throws
IOException, CommitFailedException {
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "fooIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("a").setProperty("foo", "abc");
+ builder.child("b").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ new AsyncIndexUpdate("async", store, provider).run();
+
+ String checkpoint = createIndexDirs("/oak:index/fooIndex");
+ builder = store.getRoot().builder();
+ builder.child("c").setProperty("foo", "abc");
+ builder.child("d").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ new AsyncIndexUpdate("async", store, provider).run();
+
+ FilterImpl f = createFilter(store.getRoot(), NT_BASE);
+ PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
+ assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc", f));
+
+ builder = store.getRoot().builder();
+ builder.child("e").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ return checkpoint;
+ }
+
+ private IndexImporterProvider getImporterProvider(String checkpoint) {
+ IndexImporterProvider importerProvider = new IndexImporterProvider() {
+ @Override
+ public void importIndex(NodeState root, NodeBuilder defn, File
indexDir) {
+ assertEquals("fooIndex", indexDir.getName());
+ assertEquals(2,
defn.getProperty(REINDEX_COUNT).getValue(Type.LONG).longValue());
+
defn.getChildNode(IndexConstants.INDEX_CONTENT_NODE_NAME).remove();
+
+ NodeState cpState = store.retrieve(checkpoint);
+ NodeState indexData = NodeStateUtils.getNode(cpState,
"/oak:index/fooIndex/:index");
+ defn.setChildNode(IndexConstants.INDEX_CONTENT_NODE_NAME,
indexData);
+ }
+
+ @Override
+ public String getType() {
+ return "property";
+ }
+ };
+ return importerProvider;
+ }
+
+ @Test
+ public void
importData_IncrementalUpdate_Test_Failure_AT_SWITCHLANE_State() throws
Exception {
+ String checkpoint = importData_IncrementalUpdate_Before_Setup_Method();
+ try {
+ IndexImporter importer = new IndexImporter(store,
temporaryFolder.getRoot(), provider, NOOP_LOCK) {
+ @Override
+ void switchLanes() throws CommitFailedException, IOException {
+ throw new IOException("Explicitly throw IOException");
+ }
+ };
+ importer.addImporterProvider(getImporterProvider(checkpoint));
+ importer.importIndex();
+ } finally {
+ NodeState idx =
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex");
+ assertEquals("async", idx.getString("async"));
+ FilterImpl f = createFilter(store.getRoot(), NT_BASE);
+ PropertyIndexLookup lookup = new
PropertyIndexLookup(store.getRoot());
+ assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc",
f));
+
+ lookup = new PropertyIndexLookup(store.getRoot());
+ //It would not pickup /e as thats not yet indexed as part of last
checkpoint
+ assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc",
f));
+ // checkpoint is not released because of failure
+ assertNotNull(store.retrieve(checkpoint));
+ // As failure is on switchlanes no update happened i.e. no state
change got committed
+ assertEquals("State matching failed",
+ null,
+
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.indexImportStateKey));
+
+ // Test retry logic
+ String failureLog =
MessageFormat.format("IndexImporterStepExecutor:{0} failed after {1} retries",
+ IndexImporter.IndexImportState.SWITCHLANE,
IndexImporter.retries);
+ boolean failureLogPresent = false;
+ for (String log : customizer.getLogs()) {
+ if (log.equals(failureLog)) {
+ failureLogPresent = true;
+ break;
+ }
+ }
+ assertTrue(failureLogPresent);
+
+ assertEquals("State matching failed",
+ null,
+
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.indexImportStateKey));
+ }
+ }
+
+ @Test
+ public void
importData_IncrementalUpdate_Test_Failure_AT_importIndexData_State() throws
Exception {
+ String checkpoint = importData_IncrementalUpdate_Before_Setup_Method();
+ try {
+ IndexImporter importer = new IndexImporter(store,
temporaryFolder.getRoot(), provider, NOOP_LOCK) {
+ @Override
+ void importIndexData() throws CommitFailedException,
IOException {
+ throw new IOException("Explicitly throw IOException");
+ }
+ };
+ ;
+ importer.addImporterProvider(getImporterProvider(checkpoint));
+ importer.importIndex();
+ } finally {
+ NodeState idx =
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex");
+ assertEquals("async", idx.getString("async"));
+ FilterImpl f = createFilter(store.getRoot(), NT_BASE);
+ PropertyIndexLookup lookup = new
PropertyIndexLookup(store.getRoot());
+ assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc",
f));
+
+ lookup = new PropertyIndexLookup(store.getRoot());
+ //It would not pickup /e as thats not yet indexed as part of last
checkpoint
+ assertEquals(of("a", "b", "c", "d"), find(lookup, "foo", "abc",
f));
+ assertNotNull(store.retrieve(checkpoint));
+ assertEquals("State matching failed",
+ IndexImporter.IndexImportState.SWITCHLANE.toString(),
+
store.getRoot().getChildNode("oak:index").getChildNode("fooIndex").getProperty(IndexImporter.indexImportStateKey).getValue(Type.STRING));
+ }
+ }
+
+ @Test
+ public void importData_IncrementalUpdate_NO_FAILURE() throws Exception {
Review comment:
importDataIncrementalUpdateNoFailure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]