This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit f4ef0d4bd8e290bc3b3b6a4bd45713e38e6c1edf Author: Gautam Parai <[email protected]> AuthorDate: Fri Sep 21 18:44:38 2018 -0700 DRILL-6824: Handle schema changes in MapRDBJsonRecordReader closes #1518 --- .../store/mapr/db/json/MaprDBJsonRecordReader.java | 127 ++++++++++++++++----- .../drill/exec/physical/impl/OutputMutator.java | 5 + .../apache/drill/exec/physical/impl/ScanBatch.java | 2 + .../store/parquet/ParquetRecordReaderTest.java | 5 + 4 files changed, 113 insertions(+), 26 deletions(-) diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 0be44e8..5b849ea 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -45,6 +45,7 @@ import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.complex.fn.JsonReaderUtils; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.hadoop.fs.Path; +import org.ojai.Document; import org.ojai.DocumentReader; import org.ojai.DocumentStream; import org.ojai.FieldPath; @@ -77,6 +78,7 @@ import static org.ojai.DocumentConstants.ID_FIELD; public class MaprDBJsonRecordReader extends AbstractRecordReader { private static final Logger logger = LoggerFactory.getLogger(MaprDBJsonRecordReader.class); + protected enum SchemaState {SCHEMA_UNKNOWN, SCHEMA_INIT, SCHEMA_CHANGE}; protected static final FieldPath[] ID_ONLY_PROJECTION = { ID_FIELD }; @@ -94,16 +96,19 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private OperatorContext operatorContext; protected VectorContainerWriter vectorWriter; private DBDocumentReaderBase reader; + Document document; + protected OutputMutator vectorWriterMutator; private DrillBuf buffer; private DocumentStream documentStream; private Iterator<DocumentReader> documentReaderIterators; + private Iterator<Document> documentIterator; private boolean includeId; private boolean idOnly; - + private SchemaState schemaState; private boolean projectWholeDocument; private FieldProjector projector; @@ -121,11 +126,16 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { protected OjaiValueWriter valueWriter; protected DocumentReaderVectorWriter documentWriter; protected int maxRecordsToRead = -1; + protected DBDocumentReaderBase lastDocumentReader; + protected Document lastDocument; public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin, List<SchemaPath> projectedColumns, FragmentContext context, int maxRecords) { this(subScanSpec, formatPlugin, projectedColumns, context); this.maxRecordsToRead = maxRecords; + this.lastDocumentReader = null; + this.lastDocument = null; + this.schemaState = SchemaState.SCHEMA_UNKNOWN; } protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin, @@ -264,34 +274,40 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { @Override public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { this.vectorWriter = new VectorContainerWriter(output, unionEnabled); + this.vectorWriterMutator = output; this.operatorContext = context; try { table.setOption(TableOption.EXCLUDEID, !includeId); documentStream = table.find(condition, scannedFields); - documentReaderIterators = documentStream.documentReaders().iterator(); - - if (allTextMode) { - valueWriter = new AllTextValueWriter(buffer); - } else if (readNumbersAsDouble) { - valueWriter = new NumbersAsDoubleValueWriter(buffer); - } else { - valueWriter = new OjaiValueWriter(buffer); - } - - if (projectWholeDocument) { - documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, projector, includeId); - } else if (isSkipQuery()) { - documentWriter = new RowCountVectorWriter(valueWriter); - } else if (idOnly) { - documentWriter = new IdOnlyVectorWriter(valueWriter); - } else { - documentWriter = new FieldTransferVectorWriter(valueWriter); - } + documentIterator = documentStream.iterator(); + setupWriter(); } catch (DBException ex) { throw new ExecutionSetupException(ex); } } + /* + * Setup the valueWriter and documentWriters based on config options + */ + private void setupWriter() { + if (allTextMode) { + valueWriter = new AllTextValueWriter(buffer); + } else if (readNumbersAsDouble) { + valueWriter = new NumbersAsDoubleValueWriter(buffer); + } else { + valueWriter = new OjaiValueWriter(buffer); + } + + if (projectWholeDocument) { + documentWriter = new ProjectionPassthroughVectorWriter(valueWriter, projector, includeId); + } else if (isSkipQuery()) { + documentWriter = new RowCountVectorWriter(valueWriter); + } else if (idOnly) { + documentWriter = new IdOnlyVectorWriter(valueWriter); + } else { + documentWriter = new FieldTransferVectorWriter(valueWriter); + } + } @Override public int next() { @@ -303,33 +319,71 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { int recordCount = 0; reader = null; + document = null; int maxRecordsForThisBatch = this.maxRecordsToRead >= 0? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead) : BaseValueVector.INITIAL_VALUE_ALLOCATION; + try { + // If the last document caused a SchemaChange create a new output schema for this scan batch + if (schemaState == SchemaState.SCHEMA_CHANGE && !ignoreSchemaChange) { + // Clear the ScanBatch vector container writer/mutator in order to be able to generate the new schema + vectorWriterMutator.clear(); + vectorWriter = new VectorContainerWriter(vectorWriterMutator, unionEnabled); + logger.debug("Encountered schema change earlier use new writer {}", vectorWriter.toString()); + document = lastDocument; + setupWriter(); + if (recordCount < maxRecordsForThisBatch) { + vectorWriter.setPosition(recordCount); + if (document != null) { + reader = (DBDocumentReaderBase) document.asReader(); + documentWriter.writeDBDocument(vectorWriter, reader); + recordCount++; + } + } + } + } catch (SchemaChangeException e) { + String err_row = reader.getId().asJsonString(); + if (ignoreSchemaChange) { + logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), err_row); + logger.debug("Stack trace:", e); + } else { + /* We should not encounter a SchemaChangeException here since this is the first document for this + * new schema. Something is very wrong - cannot handle any further! + */ + throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", err_row); + } + } + schemaState = SchemaState.SCHEMA_INIT; while(recordCount < maxRecordsForThisBatch) { vectorWriter.setPosition(recordCount); try { - reader = nextDocumentReader(); - if (reader == null) { + document = nextDocument(); + if (document == null) { break; // no more documents for this reader } else { - documentWriter.writeDBDocument(vectorWriter, reader); + documentWriter.writeDBDocument(vectorWriter, (DBDocumentReaderBase) document.asReader()); } recordCount++; } catch (UserException e) { throw UserException.unsupportedError(e) .addContext(String.format("Table: %s, document id: '%s'", table.getPath(), - reader == null ? null : IdCodec.asString(reader.getId()))) + document.asReader() == null ? null : + IdCodec.asString(((DBDocumentReaderBase)document.asReader()).getId()))) .build(logger); } catch (SchemaChangeException e) { - String err_row = reader.getId().asJsonString(); + String err_row = ((DBDocumentReaderBase)document.asReader()).getId().asJsonString(); if (ignoreSchemaChange) { logger.warn("{}. Dropping row '{}' from result.", e.getMessage(), err_row); logger.debug("Stack trace:", e); } else { - throw dataReadError(logger, e, "SchemaChangeException for row '%s'.", err_row); + /* Save the current document reader for next iteration. The recordCount is not updated so we + * would start from this reader on the next next() call + */ + lastDocument = document; + schemaState = SchemaState.SCHEMA_CHANGE; + break; } } } @@ -367,6 +421,27 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } } + protected Document nextDocument() { + final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats(); + try { + if (operatorStats != null) { + operatorStats.startWait(); + } + try { + if (!documentIterator.hasNext()) { + return null; + } else { + return documentIterator.next(); + } + } finally { + if (operatorStats != null) { + operatorStats.stopWait(); + } + } + } catch (DBException e) { + throw dataReadError(logger, e); + } + } /* * Extracts contiguous named segments from the SchemaPath, starting from the * root segment and build the FieldPath from it for projection. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java index b25b001..b78eaa1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java @@ -68,4 +68,9 @@ public interface OutputMutator { * @return the CallBack object for this mutator */ public CallBack getCallBack(); + + /** + * Clear this mutator i.e. reset it to pristine condition + */ + public void clear(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index a688f37..0aa8328 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -491,9 +491,11 @@ public class ScanBatch implements CloseableRecordBatch { return callBack; } + @Override public void clear() { regularFieldVectorMap.clear(); implicitFieldVectorMap.clear(); + container.clear(); schemaChanged = false; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index f29002e..1bd90b3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -364,6 +364,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery { public CallBack getCallBack() { return null; } + + @Override + public void clear() { + // Nothing to do! + } } private void validateFooters(final List<Footer> metadata) {
