This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new b2eef98 DRILL-8041: Fix mongo scan spec deserialization (#2368)
b2eef98 is described below
commit b2eef98877807fd0fc7bb10bc49729c5475bc0df
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri Nov 12 18:42:42 2021 +0200
DRILL-8041: Fix mongo scan spec deserialization (#2368)
---
.../exec/store/mongo/BaseMongoSubScanSpec.java | 9 +++++++-
.../drill/exec/store/mongo/MongoGroupScan.java | 27 +++++++++++-----------
.../drill/exec/store/mongo/MongoRecordReader.java | 11 +++++++--
.../drill/exec/store/mongo/MongoScanSpec.java | 7 ++----
.../drill/exec/store/mongo/MongoSubScan.java | 27 +++++++---------------
.../store/mongo/plan/MongoPluginImplementor.java | 20 ++++++++++++----
.../drill/exec/store/mongo/TestMongoQueries.java | 16 +++++++++++++
7 files changed, 72 insertions(+), 45 deletions(-)
diff --git
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
index 4f0d1bf..bdf3286 100644
---
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
+++
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/BaseMongoSubScanSpec.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.store.mongo;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.SuperBuilder;
@@ -26,7 +28,12 @@ import java.util.List;
@Getter
@Setter
-@SuperBuilder(setterPrefix = "set")
+@SuperBuilder
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(MongoSubScan.ShardedMongoSubScanSpec.class),
+ @JsonSubTypes.Type(MongoSubScan.MongoSubScanSpec.class)
+})
public class BaseMongoSubScanSpec {
@JsonProperty
diff --git
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
index b26b410..741d484 100644
---
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
+++
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.mongo;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -113,14 +112,14 @@ public class MongoGroupScan extends AbstractGroupScan
implements
@JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("useAggregate") boolean useAggregate,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+ @JacksonInject StoragePluginRegistry pluginRegistry) {
this(userName,
pluginRegistry.resolve(storagePluginConfig, MongoStoragePlugin.class),
scanSpec, columns, useAggregate);
}
public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
- MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate)
throws IOException {
+ MongoScanSpec scanSpec, List<SchemaPath> columns, boolean useAggregate) {
super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
@@ -350,7 +349,7 @@ public class MongoGroupScan extends AbstractGroupScan
implements
public GroupScan clone(int maxRecords) {
MongoGroupScan clone = new MongoGroupScan(this);
clone.useAggregate = true;
- clone.getScanSpec().getOperations().add(new Document("$limit",
maxRecords));
+ clone.getScanSpec().getOperations().add(new Document("$limit",
maxRecords).toJson());
return clone;
}
@@ -450,19 +449,19 @@ public class MongoGroupScan extends AbstractGroupScan
implements
private BaseMongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
if (useAggregate) {
return MongoSubScanSpec.builder()
- .setOperations(scanSpec.getOperations())
- .setDbName(scanSpec.getDbName())
- .setCollectionName(scanSpec.getCollectionName())
- .setHosts(chunkInfo.getChunkLocList())
+ .operations(scanSpec.getOperations())
+ .dbName(scanSpec.getDbName())
+ .collectionName(scanSpec.getCollectionName())
+ .hosts(chunkInfo.getChunkLocList())
.build();
}
return ShardedMongoSubScanSpec.builder()
- .setMinFilters(chunkInfo.getMinFilters())
- .setMaxFilters(chunkInfo.getMaxFilters())
- .setFilter(scanSpec.getFilters())
- .setDbName(scanSpec.getDbName())
- .setCollectionName(scanSpec.getCollectionName())
- .setHosts(chunkInfo.getChunkLocList())
+ .minFilters(chunkInfo.getMinFilters())
+ .maxFilters(chunkInfo.getMaxFilters())
+ .filter(scanSpec.getFilters())
+ .dbName(scanSpec.getDbName())
+ .collectionName(scanSpec.getCollectionName())
+ .hosts(chunkInfo.getChunkLocList())
.build();
}
diff --git
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 5ca1358..08219f6 100644
---
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -23,8 +23,10 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.Aggregates;
@@ -93,13 +95,18 @@ public class MongoRecordReader extends AbstractRecordReader
{
this.plugin = plugin;
filters = new Document();
if (subScanSpec instanceof MongoSubScan.MongoSubScanSpec) {
- operations = ((MongoSubScan.MongoSubScanSpec)
subScanSpec).getOperations();
+ operations = ((MongoSubScan.MongoSubScanSpec)
subScanSpec).getOperations().stream()
+ .map(BsonDocument::parse)
+ .collect(Collectors.toList());
} else {
MongoSubScan.ShardedMongoSubScanSpec shardedMongoSubScanSpec =
(MongoSubScan.ShardedMongoSubScanSpec) subScanSpec;
Map<String, List<Document>> mergedFilters = MongoUtils.mergeFilters(
shardedMongoSubScanSpec.getMinFilters(),
shardedMongoSubScanSpec.getMaxFilters());
- buildFilters(shardedMongoSubScanSpec.getFilter(), mergedFilters);
+ Document pushdownFilters =
Optional.ofNullable(shardedMongoSubScanSpec.getFilter())
+ .map(Document::parse)
+ .orElse(null);
+ buildFilters(pushdownFilters, mergedFilters);
}
enableAllTextMode =
fragmentContext.getOptions().getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val;
enableNanInf =
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val;
diff --git
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
index 7459bfb..a8de5c6 100644
---
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
+++
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java
@@ -22,9 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
-import org.bson.Document;
-
-import org.bson.conversions.Bson;
import java.util.ArrayList;
import java.util.List;
@@ -36,9 +33,9 @@ public class MongoScanSpec {
private final String dbName;
private final String collectionName;
- private Document filters;
+ private String filters;
- private List<Bson> operations = new ArrayList<>();
+ private List<String> operations = new ArrayList<>();
@JsonCreator
public MongoScanSpec(@JsonProperty("dbName") String dbName,
diff --git
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
index 975dc85..25d852a 100644
---
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
+++
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java
@@ -23,11 +23,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+import lombok.extern.jackson.Jacksonized;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractBase;
@@ -42,8 +41,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.bson.Document;
-import org.bson.conversions.Bson;
@JsonTypeName("mongo-shard-read")
public class MongoSubScan extends AbstractBase implements SubScan {
@@ -64,8 +61,7 @@ public class MongoSubScan extends AbstractBase implements
SubScan {
@JsonProperty("userName") String userName,
@JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig,
@JsonProperty("chunkScanSpecList") LinkedList<BaseMongoSubScanSpec>
chunkScanSpecList,
- @JsonProperty("columns") List<SchemaPath> columns)
- throws ExecutionSetupException {
+ @JsonProperty("columns") List<SchemaPath> columns) {
super(userName);
this.columns = columns;
this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig;
@@ -91,11 +87,6 @@ public class MongoSubScan extends AbstractBase implements
SubScan {
}
@JsonIgnore
- public MongoStoragePluginConfig getMongoPluginConfig() {
- return mongoPluginConfig;
- }
-
- @JsonIgnore
public MongoStoragePlugin getMongoStoragePlugin() {
return mongoStoragePlugin;
}
@@ -125,11 +116,10 @@ public class MongoSubScan extends AbstractBase implements
SubScan {
return Collections.emptyIterator();
}
- @JsonTypeName("ShardedMongoSubScanSpec")
@Getter
@ToString
- @SuperBuilder(setterPrefix = "set")
- @JsonDeserialize(builder =
ShardedMongoSubScanSpec.ShardedMongoSubScanSpecBuilder.class)
+ @Jacksonized
+ @SuperBuilder
public static class ShardedMongoSubScanSpec extends BaseMongoSubScanSpec {
@JsonProperty
@@ -139,19 +129,18 @@ public class MongoSubScan extends AbstractBase implements
SubScan {
private final Map<String, Object> maxFilters;
@JsonProperty
- private final Document filter;
+ private final String filter;
}
- @JsonTypeName("MongoSubScanSpec")
@Getter
@ToString
- @SuperBuilder(setterPrefix = "set")
- @JsonDeserialize(builder = MongoSubScanSpec.MongoSubScanSpecBuilder.class)
+ @Jacksonized
+ @SuperBuilder
public static class MongoSubScanSpec extends BaseMongoSubScanSpec {
@JsonProperty
- private final List<Bson> operations;
+ private final List<String> operations;
}
diff --git
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
index 9176fe2..5fae342 100644
---
a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
+++
b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
@@ -66,6 +66,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -219,8 +220,12 @@ public class MongoPluginImplementor extends
AbstractPluginImplementor {
@Override
public void implement(StoragePluginTableScan scan) throws IOException {
groupScan = (MongoGroupScan)
Utilities.getDrillTable(scan.getTable()).getGroupScan();
- operations = new ArrayList<>(this.groupScan.getScanSpec().getOperations());
- filters = groupScan.getScanSpec().getFilters();
+ operations = this.groupScan.getScanSpec().getOperations().stream()
+ .map(BsonDocument::parse)
+ .collect(Collectors.toList());
+ filters = Optional.ofNullable(groupScan.getScanSpec().getFilters())
+ .map(Document::parse)
+ .orElse(null);
columns = groupScan.getColumns();
}
@@ -273,9 +278,16 @@ public class MongoPluginImplementor extends
AbstractPluginImplementor {
}
@Override
- public GroupScan getPhysicalOperator() throws IOException {
+ public GroupScan getPhysicalOperator() {
MongoScanSpec scanSpec = groupScan.getScanSpec();
- MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(),
scanSpec.getCollectionName(), filters, operations);
+ List<String> operations = this.operations.stream()
+ .map(op -> op.toBsonDocument().toJson())
+ .collect(Collectors.toList());
+ String filters = Optional.ofNullable(this.filters)
+ .map(Document::toJson)
+ .orElse(null);
+ MongoScanSpec newSpec = new MongoScanSpec(scanSpec.getDbName(),
scanSpec.getCollectionName(),
+ filters, operations);
return new MongoGroupScan(groupScan.getUserName(),
groupScan.getStoragePlugin(),
newSpec, columns, runAggregate);
}
diff --git
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
index 2c79dd8..7f2610f 100644
---
a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
+++
b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.mongo;
import org.apache.drill.categories.MongoStorageTest;
import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.ExecConstants;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -53,6 +54,21 @@ public class TestMongoQueries extends MongoTestBase {
}
@Test
+ public void testFragmentSerDe() throws Exception {
+ client.alterSession(ExecConstants.SLICE_TARGET, 1);
+ try {
+ String plan = queryBuilder()
+ .sql(String.format("select t1.id as id, t1.name from mongo.%1$s.`%2$s`
t1 where t1.name = 'Cake' union " +
+ "select t2.id as id, t2.name from mongo.%1$s.`%2$s` t2 ", DONUTS_DB,
DONUTS_COLLECTION))
+ .explainJson();
+
+ assertEquals(queryBuilder().physical(plan).run().recordCount(), 5);
+ } finally {
+ client.resetSession(ExecConstants.SLICE_TARGET);
+ }
+ }
+
+ @Test
public void testWithANDOperator() throws Exception {
testBuilder()
.sqlQuery(String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE3,
EMPLOYEE_DB, EMPINFO_COLLECTION))