MD-813: Improve count(*) queries against MapR-DB Json tables. + Fail query on schema change. + Added a configuration option 'ignoreSchemaChange', which when enabled, drops the rows from the result
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/12cbd27e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/12cbd27e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/12cbd27e Branch: refs/heads/master Commit: 12cbd27ef8cf23db15645453d815a631926bd8cd Parents: 391027a Author: Aditya <adi...@mapr.com> Authored: Wed Mar 16 01:58:56 2016 -0700 Committer: Aditya Kishore <a...@apache.org> Committed: Fri Sep 9 10:08:40 2016 -0700 ---------------------------------------------------------------------- .../store/mapr/db/MapRDBFormatPluginConfig.java | 14 ++- .../mapr/db/json/MaprDBJsonRecordReader.java | 90 +++++++++++++------- .../store/mapr/streams/StreamsFormatPlugin.java | 3 +- 3 files changed, 73 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/12cbd27e/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java index 7295265..1bb07ed 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java @@ -28,8 +28,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { public boolean allTextMode = false; - public boolean readAllNumbersAsDouble = false; public boolean enablePushdown = true; + public boolean ignoreSchemaChange = false; + public boolean readAllNumbersAsDouble = false; @Override public int hashCode() { @@ -43,10 +44,11 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { return false; } else if (allTextMode != other.allTextMode) { return false; + } else if (isIgnoreSchemaChange() != other.isIgnoreSchemaChange()) { + return false; } else if (enablePushdown != other.enablePushdown) { return false; } - return true; } @@ -77,4 +79,12 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { this.enablePushdown = enablePushdown; } + public boolean isIgnoreSchemaChange() { + return ignoreSchemaChange; + } + + public void setIgnoreSchemaChange(boolean ignoreSchemaChange) { + this.ignoreSchemaChange = ignoreSchemaChange; + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/12cbd27e/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index cb86e32..9bb7daa 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -32,10 +32,12 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; @@ -90,6 +92,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private final boolean readNumbersAsDouble; private boolean disablePushdown; private final boolean allTextMode; + private final boolean ignoreSchemaChange; public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPluginConfig formatPluginConfig, @@ -111,6 +114,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble(); allTextMode = formatPluginConfig.isAllTextMode(); + ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange(); disablePushdown = !formatPluginConfig.isEnablePushdown(); } @@ -177,35 +181,23 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { DBDocumentReaderBase reader = null; while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) { + vectorWriter.setPosition(recordCount); try { reader = nextDocumentReader(); - if (reader == null) break; - - vectorWriter.setPosition(recordCount); - MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap()); - if (idOnly) { - Value id = reader.getId(); - try { - switch(id.getType()) { - case STRING: - writeString(writer, ID_KEY, id.getString()); - break; - case BINARY: - writeBinary(writer, ID_KEY, id.getBinary()); - break; - default: - throw new UnsupportedOperationException(id.getType() + - " is not a supported type for _id field."); - } - } catch (IllegalStateException | IllegalArgumentException e) { - logger.warn(String.format("Possible schema change at _id: '%s'", - IdCodec.asString(id)), e); - } + if (reader == null) { + break; // no more documents for this scanner + } else if (isSkipQuery()) { + vectorWriter.rootAsMap().bit("count").writeBit(1); } else { - if (reader.next() != EventType.START_MAP) { - throw dataReadError("The document did not start with START_MAP!"); + MapOrListWriterImpl writer = new MapOrListWriterImpl(vectorWriter.rootAsMap()); + if (idOnly) { + writeId(writer, reader.getId()); + } else { + if (reader.next() != EventType.START_MAP) { + throw dataReadError("The document did not start with START_MAP!"); + } + writeToListOrMap(writer, reader); } - writeToListOrMap(writer, reader); } recordCount++; } catch (UserException e) { @@ -214,6 +206,13 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { table.getPath(), reader == null ? null : IdCodec.asString(reader.getId()))) .build(logger); + } catch (SchemaChangeException e) { + if (ignoreSchemaChange) { + logger.warn("{}. Dropping the row from result.", e.getMessage()); + logger.debug("Stack trace:", e); + } else { + throw dataReadError(e); + } } } @@ -222,7 +221,25 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { return recordCount; } - private void writeToListOrMap(MapOrListWriterImpl writer, DBDocumentReaderBase reader) { + private void writeId(MapOrListWriterImpl writer, Value id) throws SchemaChangeException { + try { + switch(id.getType()) { + case STRING: + writeString(writer, ID_KEY, id.getString()); + break; + case BINARY: + writeBinary(writer, ID_KEY, id.getBinary()); + break; + default: + throw new UnsupportedOperationException(id.getType() + + " is not a supported type for _id field."); + } + } catch (IllegalStateException | IllegalArgumentException e) { + throw schemaChangeException(e, "Possible schema change at _id: '%s'", IdCodec.asString(id)); + } + } + + private void writeToListOrMap(MapOrListWriterImpl writer, DBDocumentReaderBase reader) throws SchemaChangeException { String fieldName = null; writer.start(); outside: while (true) { @@ -289,8 +306,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { throw unsupportedError("Unsupported type: %s encountered during the query.", event); } } catch (IllegalStateException | IllegalArgumentException e) { - logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'", - IdCodec.asString(reader.getId()), fieldName), e); + throw schemaChangeException(e, "Possible schema change at _id: '%s', field: '%s'", IdCodec.asString(reader.getId()), fieldName); } } writer.end(); @@ -410,12 +426,24 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { .build(logger); } + private UserException dataReadError(Throwable t) { + return dataReadError(t, null); + } + private UserException dataReadError(String format, Object... args) { - return UserException.dataReadError() - .message(String.format(format, args)) + return dataReadError(null, format, args); + } + + private UserException dataReadError(Throwable t, String format, Object... args) { + return UserException.dataReadError(t) + .message(format == null ? null : String.format(format, args)) .build(logger); } + private SchemaChangeException schemaChangeException(Throwable t, String format, Object... args) { + return new SchemaChangeException(format, t, args); + } + private DBDocumentReaderBase nextDocumentReader() { final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats(); try { @@ -434,7 +462,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } } } catch (DBException e) { - throw UserException.dataReadError(e).build(logger); + throw dataReadError(e); } } http://git-wip-us.apache.org/repos/asf/drill/blob/12cbd27e/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java index 811e245..f7c76b5 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.mapr.streams; import java.io.IOException; import java.util.List; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractGroupScan; @@ -73,7 +74,7 @@ public class StreamsFormatPlugin extends TableFormatPlugin { List<String> files = selection.getFiles(); assert (files.size() == 1); //TableProperties props = getMaprFS().getTableProperties(new Path(files.get(0))); - throw new UnsupportedOperationException("not implemented"); + throw UserException.unsupportedError().message("MapR streams can not be querried at this time.").build(logger); } }