This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b2eef98  DRILL-8041: Fix mongo scan spec deserialization (#2368)
b2eef98 is described below

commit b2eef98877807fd0fc7bb10bc49729c5475bc0df
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri Nov 12 18:42:42 2021 +0200

    DRILL-8041: Fix mongo scan spec deserialization (#2368)
---
 .../exec/store/mongo/BaseMongoSubScanSpec.java     |  9 +++++++-
 .../drill/exec/store/mongo/MongoGroupScan.java     | 27 +++++++++++-----------
 .../drill/exec/store/mongo/MongoRecordReader.java  | 11 +++++++--
 .../drill/exec/store/mongo/MongoScanSpec.java      |  7 ++----
 .../drill/exec/store/mongo/MongoSubScan.java       | 27 +++++++---------------
 .../store/mongo/plan/MongoPluginImplementor.java   | 20 ++++++++++++----
 .../drill/exec/store/mongo/TestMongoQueries.java   | 16 +++++++++++++
 7 files changed, 72 insertions(+), 45 deletions(-)

diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
index 4f0d1bf..bdf3286 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.mongo;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.experimental.SuperBuilder;
@@ -26,7 +28,12 @@ import java.util.List;
 
 @Getter
 @Setter
-@SuperBuilder(setterPrefix = "set")
+@SuperBuilder
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes({
+  @JsonSubTypes.Type(MongoSubScan.ShardedMongoSubScanSpec.class),
+  @JsonSubTypes.Type(MongoSubScan.MongoSubScanSpec.class)
+})
 public class BaseMongoSubScanSpec {
 
   @JsonProperty
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index b26b410..741d484 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.mongo;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -113,14 +112,14 @@ public class MongoGroupScan extends AbstractGroupScan 
implements
       @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
       @JsonProperty("columns") List<SchemaPath> columns,
       @JsonProperty("useAggregate") boolean useAggregate,
-      @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+      @JacksonInject StoragePluginRegistry pluginRegistry) {
     this(userName,
         pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
         scanSpec, columns, useAggregate);
   }
 
   public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
-      MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate) 
throws IOException {
+      MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate) {
     super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
@@ -350,7 +349,7 @@ public class MongoGroupScan extends AbstractGroupScan 
implements
   public GroupScan clone(int maxRecords) {
     MongoGroupScan clone = new MongoGroupScan(this);
     clone.useAggregate = true;
-    clone.getScanSpec().getOperations().add(new Document("$limit", 
maxRecords));
+    clone.getScanSpec().getOperations().add(new Document("$limit", 
maxRecords).toJson());
     return clone;
   }
 
@@ -450,19 +449,19 @@ public class MongoGroupScan extends AbstractGroupScan 
implements
   private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
     if (useAggregate) {
       return MongoSubScanSpec.builder()
-          .setOperations(scanSpec.getOperations())
-          .setDbName(scanSpec.getDbName())
-          .setCollectionName(scanSpec.getCollectionName())
-          .setHosts(chunkInfo.getChunkLocList())
+          .operations(scanSpec.getOperations())
+          .dbName(scanSpec.getDbName())
+          .collectionName(scanSpec.getCollectionName())
+          .hosts(chunkInfo.getChunkLocList())
           .build();
     }
     return ShardedMongoSubScanSpec.builder()
-        .setMinFilters(chunkInfo.getMinFilters())
-        .setMaxFilters(chunkInfo.getMaxFilters())
-        .setFilter(scanSpec.getFilters())
-        .setDbName(scanSpec.getDbName())
-        .setCollectionName(scanSpec.getCollectionName())
-        .setHosts(chunkInfo.getChunkLocList())
+        .minFilters(chunkInfo.getMinFilters())
+        .maxFilters(chunkInfo.getMaxFilters())
+        .filter(scanSpec.getFilters())
+        .dbName(scanSpec.getDbName())
+        .collectionName(scanSpec.getCollectionName())
+        .hosts(chunkInfo.getChunkLocList())
         .build();
   }
 
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 5ca1358..08219f6 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -23,8 +23,10 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.mongodb.client.MongoIterable;
 import com.mongodb.client.model.Aggregates;
@@ -93,13 +95,18 @@ public class MongoRecordReader extends AbstractRecordReader 
{
     this.plugin = plugin;
     filters = new Document();
     if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
-      operations = ((MongoSubScan.MongoSubScanSpec) 
subScanSpec).getOperations();
+      operations = ((MongoSubScan.MongoSubScanSpec) 
subScanSpec).getOperations().stream()
+        .map(BsonDocument::parse)
+        .collect(Collectors.toList());
     } else {
       MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec = 
(MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
       Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
           shardedMongoSubScanSpec.getMinFilters(), 
shardedMongoSubScanSpec.getMaxFilters());
 
-      buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+      Document pushdownFilters = 
Optional.ofNullable(shardedMongoSubScanSpec.getFilter())
+        .map(Document::parse)
+        .orElse(null);
+      buildFilters(pushdownFilters, mergedFilters);
     }
     enableAllTextMode = 
fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
     enableNanInf = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 7459bfb..a8de5c6 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -22,9 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.ToString;
-import org.bson.Document;
-
-import org.bson.conversions.Bson;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -36,9 +33,9 @@ public class MongoScanSpec {
   private final String dbName;
   private final String collectionName;
 
-  private Document filters;
+  private String filters;
 
-  private List<Bson> operations = new ArrayList<>();
+  private List<String> operations = new ArrayList<>();
 
   @JsonCreator
   public MongoScanSpec(@JsonProperty("dbName") String dbName,
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 975dc85..25d852a 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -23,11 +23,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import lombok.Getter;
 import lombok.ToString;
 import lombok.experimental.SuperBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+import lombok.extern.jackson.Jacksonized;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractBase;
@@ -42,8 +41,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.bson.Document;
-import org.bson.conversions.Bson;
 
 @JsonTypeName("mongo-shard-read")
 public class MongoSubScan extends AbstractBase implements SubScan {
@@ -64,8 +61,7 @@ public class MongoSubScan extends AbstractBase implements 
SubScan {
       @JsonProperty("userName") String userName,
       @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
       @JsonProperty("chunkScanSpecList") LinkedList<BaseMongoSubScanSpec> 
chunkScanSpecList,
-      @JsonProperty("columns") List<SchemaPath> columns)
-      throws ExecutionSetupException {
+      @JsonProperty("columns") List<SchemaPath> columns) {
     super(userName);
     this.columns = columns;
     this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
@@ -91,11 +87,6 @@ public class MongoSubScan extends AbstractBase implements 
SubScan {
   }
 
   @JsonIgnore
-  public MongoStoragePluginConfig getMongoPluginConfig() {
-    return mongoPluginConfig;
-  }
-
-  @JsonIgnore
   public MongoStoragePlugin getMongoStoragePlugin() {
     return mongoStoragePlugin;
   }
@@ -125,11 +116,10 @@ public class MongoSubScan extends AbstractBase implements 
SubScan {
     return Collections.emptyIterator();
   }
 
-  @JsonTypeName("ShardedMongoSubScanSpec")
   @Getter
   @ToString
-  @SuperBuilder(setterPrefix = "set")
-  @JsonDeserialize(builder = 
ShardedMongoSubScanSpec.ShardedMongoSubScanSpecBuilder.class)
+  @Jacksonized
+  @SuperBuilder
   public static class ShardedMongoSubScanSpec extends BaseMongoSubScanSpec {
 
     @JsonProperty
@@ -139,19 +129,18 @@ public class MongoSubScan extends AbstractBase implements 
SubScan {
     private final Map<String, Object> maxFilters;
 
     @JsonProperty
-    private final Document filter;
+    private final String filter;
 
   }
 
-  @JsonTypeName("MongoSubScanSpec")
   @Getter
   @ToString
-  @SuperBuilder(setterPrefix = "set")
-  @JsonDeserialize(builder = MongoSubScanSpec.MongoSubScanSpecBuilder.class)
+  @Jacksonized
+  @SuperBuilder
   public static class MongoSubScanSpec extends BaseMongoSubScanSpec {
 
     @JsonProperty
-    private final List<Bson> operations;
+    private final List<String> operations;
 
   }
 
diff --git 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index 9176fe2..5fae342 100644
--- 
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++ 
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -66,6 +66,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -219,8 +220,12 @@ public class MongoPluginImplementor extends 
AbstractPluginImplementor {
   @Override
   public void implement(StoragePluginTableScan scan) throws IOException {
     groupScan = (MongoGroupScan) 
Utilities.getDrillTable(scan.getTable()).getGroupScan();
-    operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
-    filters = groupScan.getScanSpec().getFilters();
+    operations = this.groupScan.getScanSpec().getOperations().stream()
+      .map(BsonDocument::parse)
+      .collect(Collectors.toList());
+    filters = Optional.ofNullable(groupScan.getScanSpec().getFilters())
+      .map(Document::parse)
+      .orElse(null);
     columns = groupScan.getColumns();
   }
 
@@ -273,9 +278,16 @@ public class MongoPluginImplementor extends 
AbstractPluginImplementor {
   }
 
   @Override
-  public GroupScan getPhysicalOperator() throws IOException {
+  public GroupScan getPhysicalOperator() {
     MongoScanSpec scanSpec = groupScan.getScanSpec();
-    MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), 
scanSpec.getCollectionName(), filters, operations);
+    List<String> operations = this.operations.stream()
+      .map(op -> op.toBsonDocument().toJson())
+      .collect(Collectors.toList());
+    String filters = Optional.ofNullable(this.filters)
+      .map(Document::toJson)
+      .orElse(null);
+    MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(), 
scanSpec.getCollectionName(),
+      filters, operations);
     return new MongoGroupScan(groupScan.getUserName(), 
groupScan.getStoragePlugin(),
       newSpec, columns, runAggregate);
   }
diff --git 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index 2c79dd8..7f2610f 100644
--- 
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++ 
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.mongo;
 
 import org.apache.drill.categories.MongoStorageTest;
 import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.ExecConstants;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -53,6 +54,21 @@ public class TestMongoQueries extends MongoTestBase {
   }
 
   @Test
+  public void testFragmentSerDe() throws Exception {
+    client.alterSession(ExecConstants.SLICE_TARGET, 1);
+    try {
+      String plan = queryBuilder()
+        .sql(String.format("select t1.id as id, t1.name from mongo.%1$s.`%2$s` 
t1 where t1.name = 'Cake' union " +
+          "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2 ", DONUTS_DB, 
DONUTS_COLLECTION))
+        .explainJson();
+
+      assertEquals(queryBuilder().physical(plan).run().recordCount(), 5);
+    } finally {
+      client.resetSession(ExecConstants.SLICE_TARGET);
+    }
+  }
+
+  @Test
   public void testWithANDOperator() throws Exception {
     testBuilder()
         .sqlQuery(String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE3, 
EMPLOYEE_DB, EMPINFO_COLLECTION))

Reply via email to