MD-813: Improve count(*) queries against MapR-DB Json tables.

+ Fail query on schema change.
+ Added a configuration option 'ignoreSchemaChange', which when enabled, drops 
the rows from the result


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/12cbd27e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/12cbd27e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/12cbd27e

Branch: refs/heads/master
Commit: 12cbd27ef8cf23db15645453d815a631926bd8cd
Parents: 391027a
Author: Aditya <adi...@mapr.com>
Authored: Wed Mar 16 01:58:56 2016 -0700
Committer: Aditya Kishore <a...@apache.org>
Committed: Fri Sep 9 10:08:40 2016 -0700

----------------------------------------------------------------------
 .../store/mapr/db/MapRDBFormatPluginConfig.java | 14 ++-
 .../mapr/db/json/MaprDBJsonRecordReader.java    | 90 +++++++++++++-------
 .../store/mapr/streams/StreamsFormatPlugin.java |  3 +-
 3 files changed, 73 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/12cbd27e/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
index 7295265..1bb07ed 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java
@@ -28,8 +28,9 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {
 
   public boolean allTextMode = false;
-  public boolean readAllNumbersAsDouble = false;
   public boolean enablePushdown = true;
+  public boolean ignoreSchemaChange = false;
+  public boolean readAllNumbersAsDouble = false;
 
   @Override
   public int hashCode() {
@@ -43,10 +44,11 @@ public class MapRDBFormatPluginConfig extends 
TableFormatPluginConfig {
       return false;
     } else if (allTextMode != other.allTextMode) {
       return false;
+    } else if (isIgnoreSchemaChange() != other.isIgnoreSchemaChange()) {
+      return false;
     } else if (enablePushdown != other.enablePushdown) {
       return false;
     }
-
     return true;
   }
 
@@ -77,4 +79,12 @@ public class MapRDBFormatPluginConfig extends 
TableFormatPluginConfig {
     this.enablePushdown = enablePushdown;
   }
 
+  public boolean isIgnoreSchemaChange() {
+    return ignoreSchemaChange;
+  }
+
+  public void setIgnoreSchemaChange(boolean ignoreSchemaChange) {
+    this.ignoreSchemaChange = ignoreSchemaChange;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/12cbd27e/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index cb86e32..9bb7daa 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -32,10 +32,12 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
 import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
@@ -90,6 +92,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
   private final boolean readNumbersAsDouble;
   private boolean disablePushdown;
   private final boolean allTextMode;
+  private final boolean ignoreSchemaChange;
 
   public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
       MapRDBFormatPluginConfig formatPluginConfig,
@@ -111,6 +114,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
     unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
     readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
     allTextMode = formatPluginConfig.isAllTextMode();
+    ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange();
     disablePushdown = !formatPluginConfig.isEnablePushdown();
   }
 
@@ -177,35 +181,23 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
     DBDocumentReaderBase reader = null;
 
     while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
+      vectorWriter.setPosition(recordCount);
       try {
         reader = nextDocumentReader();
-        if (reader == null) break;
-
-        vectorWriter.setPosition(recordCount);
-        MapOrListWriterImpl writer = new 
MapOrListWriterImpl(vectorWriter.rootAsMap());
-        if (idOnly) {
-          Value id = reader.getId();
-          try {
-            switch(id.getType()) {
-            case STRING:
-              writeString(writer, ID_KEY, id.getString());
-              break;
-            case BINARY:
-              writeBinary(writer, ID_KEY, id.getBinary());
-              break;
-            default:
-              throw new UnsupportedOperationException(id.getType() +
-                  " is not a supported type for _id field.");
-            }
-          } catch (IllegalStateException | IllegalArgumentException e) {
-            logger.warn(String.format("Possible schema change at _id: '%s'",
-                IdCodec.asString(id)), e);
-          }
+        if (reader == null) {
+          break; // no more documents for this scanner
+        } else if (isSkipQuery()) {
+          vectorWriter.rootAsMap().bit("count").writeBit(1);
         } else {
-          if (reader.next() != EventType.START_MAP) {
-            throw dataReadError("The document did not start with START_MAP!");
+          MapOrListWriterImpl writer = new 
MapOrListWriterImpl(vectorWriter.rootAsMap());
+          if (idOnly) {
+            writeId(writer, reader.getId());
+          } else {
+            if (reader.next() != EventType.START_MAP) {
+              throw dataReadError("The document did not start with 
START_MAP!");
+            }
+            writeToListOrMap(writer, reader);
           }
-          writeToListOrMap(writer, reader);
         }
         recordCount++;
       } catch (UserException e) {
@@ -214,6 +206,13 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
                 table.getPath(),
                 reader == null ? null : IdCodec.asString(reader.getId())))
             .build(logger);
+      } catch (SchemaChangeException e) {
+        if (ignoreSchemaChange) {
+          logger.warn("{}. Dropping the row from result.", e.getMessage());
+          logger.debug("Stack trace:", e);
+        } else {
+          throw dataReadError(e);
+        }
       }
     }
 
@@ -222,7 +221,25 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
     return recordCount;
   }
 
-  private void writeToListOrMap(MapOrListWriterImpl writer, 
DBDocumentReaderBase reader) {
+  private void writeId(MapOrListWriterImpl writer, Value id) throws 
SchemaChangeException {
+    try {
+      switch(id.getType()) {
+      case STRING:
+        writeString(writer, ID_KEY, id.getString());
+        break;
+      case BINARY:
+        writeBinary(writer, ID_KEY, id.getBinary());
+        break;
+      default:
+        throw new UnsupportedOperationException(id.getType() +
+            " is not a supported type for _id field.");
+      }
+    } catch (IllegalStateException | IllegalArgumentException e) {
+      throw schemaChangeException(e, "Possible schema change at _id: '%s'", 
IdCodec.asString(id));
+    }
+  }
+
+  private void writeToListOrMap(MapOrListWriterImpl writer, 
DBDocumentReaderBase reader) throws SchemaChangeException {
     String fieldName = null;
     writer.start();
     outside: while (true) {
@@ -289,8 +306,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
           throw unsupportedError("Unsupported type: %s encountered during the 
query.", event);
         }
       } catch (IllegalStateException | IllegalArgumentException e) {
-        logger.warn(String.format("Possible schema change at _id: '%s', field: 
'%s'",
-            IdCodec.asString(reader.getId()), fieldName), e);
+        throw schemaChangeException(e, "Possible schema change at _id: '%s', 
field: '%s'", IdCodec.asString(reader.getId()), fieldName);
       }
     }
     writer.end();
@@ -410,12 +426,24 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
         .build(logger);
   }
 
+  private UserException dataReadError(Throwable t) {
+    return dataReadError(t, null);
+  }
+
   private UserException dataReadError(String format, Object... args) {
-    return UserException.dataReadError()
-        .message(String.format(format, args))
+    return dataReadError(null, format, args);
+  }
+
+  private UserException dataReadError(Throwable t, String format, Object... 
args) {
+    return UserException.dataReadError(t)
+        .message(format == null ? null : String.format(format, args))
         .build(logger);
   }
 
+  private SchemaChangeException schemaChangeException(Throwable t, String 
format, Object... args) {
+    return new SchemaChangeException(format, t, args);
+  }
+
   private DBDocumentReaderBase nextDocumentReader() {
     final OperatorStats operatorStats = operatorContext == null ? null : 
operatorContext.getStats();
     try {
@@ -434,7 +462,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
         }
       }
     } catch (DBException e) {
-      throw UserException.dataReadError(e).build(logger);
+      throw dataReadError(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/12cbd27e/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
index 811e245..f7c76b5 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.mapr.streams;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -73,7 +74,7 @@ public class StreamsFormatPlugin extends TableFormatPlugin {
     List<String> files = selection.getFiles();
     assert (files.size() == 1);
     //TableProperties props = getMaprFS().getTableProperties(new 
Path(files.get(0)));
-    throw new UnsupportedOperationException("not implemented");
+    throw UserException.unsupportedError().message("MapR streams can not be 
querried at this time.").build(logger);
   }
 
 }

Reply via email to