DRILL-98: MongoDB storage plugin This commit disables MongoDB PStore due to changes to the PStore interface.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2ca9c907 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2ca9c907 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2ca9c907 Branch: refs/heads/master Commit: 2ca9c907bff639e08a561eac32e0acab3a0b3304 Parents: 786fd36 Author: Kamesh <kam.iit...@gmail.com> Authored: Sat Sep 27 18:35:28 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Sun Sep 28 00:10:44 2014 -0700 ---------------------------------------------------------------------- contrib/pom.xml | 1 + contrib/storage-mongo/pom.xml | 85 +++ .../exec/store/mongo/DrillMongoConstants.java | 45 ++ .../exec/store/mongo/MongoCnxnManager.java | 73 +++ .../mongo/MongoCompareFunctionProcessor.java | 255 +++++++++ .../exec/store/mongo/MongoFilterBuilder.java | 219 ++++++++ .../drill/exec/store/mongo/MongoGroupScan.java | 538 +++++++++++++++++++ .../store/mongo/MongoPushDownFilterForScan.java | 103 ++++ .../exec/store/mongo/MongoRecordReader.java | 302 +++++++++++ .../exec/store/mongo/MongoScanBatchCreator.java | 68 +++ .../drill/exec/store/mongo/MongoScanSpec.java | 62 +++ .../exec/store/mongo/MongoStoragePlugin.java | 88 +++ .../store/mongo/MongoStoragePluginConfig.java | 72 +++ .../drill/exec/store/mongo/MongoSubScan.java | 214 ++++++++ .../drill/exec/store/mongo/MongoUtils.java | 89 +++ .../exec/store/mongo/common/ChunkInfo.java | 68 +++ .../exec/store/mongo/common/MongoCompareOp.java | 34 ++ .../exec/store/mongo/config/MongoPStore.java | 190 +++++++ .../store/mongo/config/MongoPStoreProvider.java | 76 +++ .../store/mongo/schema/MongoDatabaseSchema.java | 59 ++ .../store/mongo/schema/MongoSchemaFactory.java | 189 +++++++ .../resources/bootstrap-storage-plugins.json | 9 + .../src/main/resources/drill-module.conf | 28 + .../store/mongo/TestMongoChunkAssignment.java | 274 ++++++++++ distribution/pom.xml | 5 + distribution/src/assemble/bin.xml | 1 + .../org/apache/drill/exec/ExecConstants.java | 2 + .../server/options/SystemOptionManager.java | 1 + .../exec/vector/complex/fn/JsonReader.java | 16 +- .../vector/complex/fn/JsonReaderWithState.java | 22 + 30 files changed, 3186 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 728038a..cbb5598 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -34,6 +34,7 @@ <modules> <module>storage-hbase</module> <module>storage-hive</module> + <module>storage-mongo</module> <module>sqlline</module> <module>data</module> </modules> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml new file mode 100644 index 0000000..720bf1e --- /dev/null +++ b/contrib/storage-mongo/pom.xml @@ -0,0 +1,85 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>drill-contrib-parent</artifactId> + <groupId>org.apache.drill.contrib</groupId> + <version>0.6.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>drill-mongo-storage</artifactId> + + <name>contrib/mongo-storage-plugin</name> + + <dependencies> + <dependency> + <groupId>org.apache.drill.exec</groupId> + <artifactId>drill-java-exec</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>2.12.2</version> + <scope>compile</scope> + </dependency> + + <!-- Test dependencie --> + <dependency> + <groupId>org.apache.drill.exec</groupId> + <artifactId>drill-java-exec</artifactId> + <classifier>tests</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.drill</groupId> + <artifactId>drill-common</artifactId> + <classifier>tests</classifier> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>2.1.1</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <property> + <name>logback.log.dir</name> + <value>${project.build.directory}/surefire-reports</value> + </property> + </systemProperties> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java new file mode 100644 index 0000000..8261e2e --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/DrillMongoConstants.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +public interface DrillMongoConstants { + + public static final String SYS_STORE_PROVIDER_MONGO_URL = "drill.exec.sys.store.provider.mongo.url"; + + public static final String ID = "_id"; + + public static final String SHARDS = "shards"; + + public static final String NS = "ns"; + + public static final String SHARD = "shard"; + + public static final String HOST = "host"; + + public static final String CHUNKS = "chunks"; + + public static final String SIZE = "size"; + + public static final String COUNT = "count"; + + public static final String CONFIG = "config"; + + public static final String MIN = "min"; + + public static final String MAX = "max"; +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java new file mode 100644 index 0000000..a4482dd --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.ServerAddress; + +public class MongoCnxnManager { + + private static final Logger logger = LoggerFactory + .getLogger(MongoCnxnManager.class); + private static Cache<ServerAddress, MongoClient> addressClientMap; + + static { + addressClientMap = CacheBuilder.newBuilder().maximumSize(5) + .expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener(new AddressCloser()).build(); + } + + private static class AddressCloser implements + RemovalListener<ServerAddress, MongoClient> { + @Override + public synchronized void onRemoval( + RemovalNotification<ServerAddress, MongoClient> removal) { + removal.getValue().close(); + ; + logger.debug("Closed connection to {}.", removal.getKey().toString()); + } + } + + public synchronized static MongoClient getClient( + List<ServerAddress> addresses, MongoClientOptions clientOptions) + throws UnknownHostException { + // Take the first replica from the replicated servers + ServerAddress serverAddress = addresses.get(0); + MongoClient client = addressClientMap.getIfPresent(serverAddress); + if (client == null) { + client = new MongoClient(addresses, clientOptions); + addressClientMap.put(serverAddress, client); + logger.debug("Created connection to {}.", serverAddress.toString()); + logger.debug("Number of connections opened are {}.", + addressClientMap.size()); + } + return client; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java new file mode 100644 index 0000000..d588f1e --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCompareFunctionProcessor.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import org.apache.drill.common.expression.CastExpression; +import org.apache.drill.common.expression.ConvertExpression; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions.BooleanExpression; +import org.apache.drill.common.expression.ValueExpressions.DateExpression; +import org.apache.drill.common.expression.ValueExpressions.DoubleExpression; +import org.apache.drill.common.expression.ValueExpressions.FloatExpression; +import org.apache.drill.common.expression.ValueExpressions.IntExpression; +import org.apache.drill.common.expression.ValueExpressions.LongExpression; +import org.apache.drill.common.expression.ValueExpressions.QuotedString; +import org.apache.drill.common.expression.ValueExpressions.TimeExpression; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class MongoCompareFunctionProcessor extends + AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> { + private Object value; + private boolean success; + private boolean isEqualityFn; + private SchemaPath path; + private String functionName; + + public static boolean isCompareFunction(String functionName) { + return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName); + } + + public static MongoCompareFunctionProcessor process(FunctionCall call) { + String functionName = call.getName(); + LogicalExpression nameArg = call.args.get(0); + LogicalExpression valueArg = call.args.size() == 2 ? call.args.get(1) + : null; + MongoCompareFunctionProcessor evaluator = new MongoCompareFunctionProcessor( + functionName); + + if (valueArg != null) { // binary function + if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) { + LogicalExpression swapArg = valueArg; + valueArg = nameArg; + nameArg = swapArg; + evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP + .get(functionName); + } + evaluator.success = nameArg.accept(evaluator, valueArg); + } else if (call.args.get(0) instanceof SchemaPath) { + evaluator.success = true; + evaluator.path = (SchemaPath) nameArg; + } + + return evaluator; + } + + public MongoCompareFunctionProcessor(String functionName) { + this.success = false; + this.functionName = functionName; + this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP + .containsKey(functionName) + && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals( + functionName); + } + + public Object getValue() { + return value; + } + + public boolean isSuccess() { + return success; + } + + public SchemaPath getPath() { + return path; + } + + public String getFunctionName() { + return functionName; + } + + @Override + public Boolean visitCastExpression(CastExpression e, + LogicalExpression valueArg) throws RuntimeException { + if (e.getInput() instanceof CastExpression + || e.getInput() instanceof SchemaPath) { + return e.getInput().accept(this, valueArg); + } + return false; + } + + @Override + public Boolean visitConvertExpression(ConvertExpression e, + LogicalExpression valueArg) throws RuntimeException { + if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM + && e.getInput() instanceof SchemaPath) { + String encodingType = e.getEncodingType(); + switch (encodingType) { + case "INT_BE": + case "INT": + case "UINT_BE": + case "UINT": + case "UINT4_BE": + case "UINT4": + if (valueArg instanceof IntExpression + && (isEqualityFn || encodingType.startsWith("U"))) { + this.value = ((IntExpression) valueArg).getInt(); + } + break; + case "BIGINT_BE": + case "BIGINT": + case "UINT8_BE": + case "UINT8": + if (valueArg instanceof LongExpression + && (isEqualityFn || encodingType.startsWith("U"))) { + this.value = ((LongExpression) valueArg).getLong(); + } + break; + case "FLOAT": + if (valueArg instanceof FloatExpression && isEqualityFn) { + this.value = ((FloatExpression) valueArg).getFloat(); + } + break; + case "DOUBLE": + if (valueArg instanceof DoubleExpression && isEqualityFn) { + this.value = ((DoubleExpression) valueArg).getDouble(); + } + break; + case "TIME_EPOCH": + case "TIME_EPOCH_BE": + if (valueArg instanceof TimeExpression) { + this.value = ((TimeExpression) valueArg).getTime(); + } + break; + case "DATE_EPOCH": + case "DATE_EPOCH_BE": + if (valueArg instanceof DateExpression) { + this.value = ((DateExpression) valueArg).getDate(); + } + break; + case "BOOLEAN_BYTE": + if (valueArg instanceof BooleanExpression) { + this.value = ((BooleanExpression) valueArg).getBoolean(); + } + break; + case "UTF8": + // let visitSchemaPath() handle this. + return e.getInput().accept(this, valueArg); + } + + if (value != null) { + this.path = (SchemaPath) e.getInput(); + return true; + } + } + return false; + } + + @Override + public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) + throws RuntimeException { + return false; + } + + @Override + public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) + throws RuntimeException { + if (valueArg instanceof QuotedString) { + this.value = ((QuotedString) valueArg).value; + this.path = path; + return true; + } + + if (valueArg instanceof IntExpression) { + this.value = ((IntExpression) valueArg).getInt(); + this.path = path; + return true; + } + + if (valueArg instanceof LongExpression) { + this.value = ((LongExpression) valueArg).getLong(); + this.path = path; + return true; + } + + if (valueArg instanceof FloatExpression) { + this.value = ((FloatExpression) valueArg).getFloat(); + this.path = path; + return true; + } + + if (valueArg instanceof DoubleExpression) { + this.value = ((DoubleExpression) valueArg).getDouble(); + this.path = path; + return true; + } + + if (valueArg instanceof BooleanExpression) { + this.value = ((BooleanExpression) valueArg).getBoolean(); + this.path = path; + return true; + } + + return false; + } + + private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES; + static { + ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet + .builder(); + VALUE_EXPRESSION_CLASSES = builder.add(BooleanExpression.class) + .add(DateExpression.class).add(DoubleExpression.class) + .add(FloatExpression.class).add(IntExpression.class) + .add(LongExpression.class).add(QuotedString.class) + .add(TimeExpression.class).build(); + } + + private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP; + static { + ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); + COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder + // unary functions + .put("isnotnull", "isnotnull") + .put("isNotNull", "isNotNull") + .put("is not null", "is not null") + .put("isnull", "isnull") + .put("isNull", "isNull") + .put("is null", "is null") + // binary functions + .put("equal", "equal").put("not_equal", "not_equal") + .put("greater_than_or_equal_to", "less_than_or_equal_to") + .put("greater_than", "less_than") + .put("less_than_or_equal_to", "greater_than_or_equal_to") + .put("less_than", "greater_than").build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java new file mode 100644 index 0000000..def793a --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoFilterBuilder.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.common.expression.BooleanOperator; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.exec.store.mongo.common.MongoCompareOp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; +import com.mongodb.BasicDBObject; + +public class MongoFilterBuilder extends + AbstractExprVisitor<MongoScanSpec, Void, RuntimeException> implements + DrillMongoConstants { + static final Logger logger = LoggerFactory + .getLogger(MongoFilterBuilder.class); + final MongoGroupScan groupScan; + final LogicalExpression le; + private boolean allExpressionsConverted = true; + + public MongoFilterBuilder(MongoGroupScan groupScan, + LogicalExpression conditionExp) { + this.groupScan = groupScan; + this.le = conditionExp; + } + + public MongoScanSpec parseTree() { + MongoScanSpec parsedSpec = le.accept(this, null); + if (parsedSpec != null) { + parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getScanSpec(), + parsedSpec); + } + return parsedSpec; + } + + private MongoScanSpec mergeScanSpecs(String functionName, + MongoScanSpec leftScanSpec, MongoScanSpec rightScanSpec) { + BasicDBObject newFilter = null; + + switch (functionName) { + case "booleanAnd": + if (leftScanSpec.getFilters() != null + && rightScanSpec.getFilters() != null) { + newFilter = MongoUtils.andFilterAtIndex(leftScanSpec.getFilters(), + rightScanSpec.getFilters()); + } else if (leftScanSpec.getFilters() != null) { + newFilter = leftScanSpec.getFilters(); + } else { + newFilter = rightScanSpec.getFilters(); + } + break; + case "booleanOr": + newFilter = MongoUtils.orFilterAtIndex(leftScanSpec.getFilters(), + rightScanSpec.getFilters()); + } + return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan + .getScanSpec().getCollectionName(), newFilter); + } + + public boolean isAllExpressionsConverted() { + return allExpressionsConverted; + } + + @Override + public MongoScanSpec visitUnknown(LogicalExpression e, Void value) + throws RuntimeException { + allExpressionsConverted = false; + return null; + } + + @Override + public MongoScanSpec visitBooleanOperator(BooleanOperator op, Void value) { + List<LogicalExpression> args = op.args; + MongoScanSpec nodeScanSpec = null; + String functionName = op.getName(); + for (int i = 0; i < args.size(); ++i) { + switch (functionName) { + case "booleanAnd": + case "booleanOr": + if (nodeScanSpec == null) { + nodeScanSpec = args.get(i).accept(this, null); + } else { + MongoScanSpec scanSpec = args.get(i).accept(this, null); + if (scanSpec != null) { + nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec); + } else { + allExpressionsConverted = false; + } + } + break; + } + } + return nodeScanSpec; + } + + @Override + public MongoScanSpec visitFunctionCall(FunctionCall call, Void value) + throws RuntimeException { + MongoScanSpec nodeScanSpec = null; + String functionName = call.getName(); + ImmutableList<LogicalExpression> args = call.args; + + if (MongoCompareFunctionProcessor.isCompareFunction(functionName)) { + MongoCompareFunctionProcessor processor = MongoCompareFunctionProcessor + .process(call); + if (processor.isSuccess()) { + try { + nodeScanSpec = createMongoScanSpec(processor.getFunctionName(), + processor.getPath(), processor.getValue()); + } catch (Exception e) { + logger.error(" Failed to creare Filter ", e); + // throw new RuntimeException(e.getMessage(), e); + } + } + } else { + switch (functionName) { + case "booleanAnd": + case "booleanOr": + MongoScanSpec leftScanSpec = args.get(0).accept(this, null); + MongoScanSpec rightScanSpec = args.get(1).accept(this, null); + if (leftScanSpec != null && rightScanSpec != null) { + nodeScanSpec = mergeScanSpecs(functionName, leftScanSpec, + rightScanSpec); + } else { + allExpressionsConverted = false; + if ("booleanAnd".equals(functionName)) { + nodeScanSpec = leftScanSpec == null ? rightScanSpec : leftScanSpec; + } + } + break; + } + } + + if (nodeScanSpec == null) { + allExpressionsConverted = false; + } + + return nodeScanSpec; + } + + private MongoScanSpec createMongoScanSpec(String functionName, + SchemaPath field, Object fieldValue) throws ClassNotFoundException, + IOException { + // extract the field name + String fieldName = field.getAsUnescapedPath(); + MongoCompareOp compareOp = null; + switch (functionName) { + case "equal": + compareOp = MongoCompareOp.EQUAL; + break; + case "not_equal": + compareOp = MongoCompareOp.NOT_EQUAL; + break; + case "greater_than_or_equal_to": + compareOp = MongoCompareOp.GREATER_OR_EQUAL; + break; + case "greater_than": + compareOp = MongoCompareOp.GREATER; + break; + case "less_than_or_equal_to": + compareOp = MongoCompareOp.LESS_OR_EQUAL; + break; + case "less_than": + compareOp = MongoCompareOp.LESS; + break; + case "isnull": + case "isNull": + case "is null": + compareOp = MongoCompareOp.IFNULL; + break; + case "isnotnull": + case "isNotNull": + case "is not null": + compareOp = MongoCompareOp.IFNOTNULL; + break; + } + + if (compareOp != null) { + BasicDBObject queryFilter = new BasicDBObject(); + if (compareOp == MongoCompareOp.IFNULL) { + queryFilter.put(fieldName, + new BasicDBObject(MongoCompareOp.EQUAL.getCompareOp(), null)); + } else if (compareOp == MongoCompareOp.IFNOTNULL) { + queryFilter.put(fieldName, + new BasicDBObject(MongoCompareOp.NOT_EQUAL.getCompareOp(), null)); + } else { + queryFilter.put(fieldName, new BasicDBObject(compareOp.getCompareOp(), + fieldValue)); + } + return new MongoScanSpec(groupScan.getScanSpec().getDbName(), groupScan + .getScanSpec().getCollectionName(), queryFilter); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java new file mode 100644 index 0000000..ccce3d5 --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java @@ -0,0 +1,538 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec; +import org.apache.drill.exec.store.mongo.common.ChunkInfo; +import org.bson.types.MaxKey; +import org.bson.types.MinKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import parquet.org.codehaus.jackson.annotate.JsonCreator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.mongodb.BasicDBObject; +import com.mongodb.CommandResult; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; + +@JsonTypeName("mongo-scan") +public class MongoGroupScan extends AbstractGroupScan implements + DrillMongoConstants { + + private static final Integer select = Integer.valueOf(1); + + static final Logger logger = LoggerFactory.getLogger(MongoGroupScan.class); + + private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MongoSubScanSpec>>() { + @Override + public int compare(List<MongoSubScanSpec> list1, + List<MongoSubScanSpec> list2) { + return list1.size() - list2.size(); + } + }; + + private static final Comparator<List<MongoSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections + .reverseOrder(LIST_SIZE_COMPARATOR); + + private MongoStoragePlugin storagePlugin; + + private MongoStoragePluginConfig storagePluginConfig; + + private MongoScanSpec scanSpec; + + private List<SchemaPath> columns; + + private Map<Integer, List<MongoSubScanSpec>> endpointFragmentMapping; + + // Sharding with replica sets contains all the replica server addresses for + // each chunk. + private Map<String, Set<ServerAddress>> chunksMapping; + + private Map<String, List<ChunkInfo>> chunksInverseMapping; + + private Stopwatch watch = new Stopwatch(); + + private boolean filterPushedDown = false; + + @JsonCreator + public MongoGroupScan(@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec, + @JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig, + @JsonProperty("columns") List<SchemaPath> columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, + ExecutionSetupException { + this((MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), + scanSpec, columns); + } + + public MongoGroupScan(MongoStoragePlugin storagePlugin, + MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException { + this.storagePlugin = storagePlugin; + this.storagePluginConfig = storagePlugin.getConfig(); + this.scanSpec = scanSpec; + this.columns = columns; + this.storagePluginConfig.getConnection(); + init(); + } + + /** + * Private constructor, used for cloning. + * @param that + * The MongoGroupScan to clone + */ + private MongoGroupScan(MongoGroupScan that) { + this.scanSpec = that.scanSpec; + this.columns = that.columns; + this.storagePlugin = that.storagePlugin; + this.storagePluginConfig = that.storagePluginConfig; + this.chunksMapping = that.chunksMapping; + this.chunksInverseMapping = that.chunksInverseMapping; + this.endpointFragmentMapping = that.endpointFragmentMapping; + this.filterPushedDown = that.filterPushedDown; + } + + @JsonIgnore + public boolean isFilterPushedDown() { + return filterPushedDown; + } + + @JsonIgnore + public void setFilterPushedDown(boolean filterPushedDown) { + this.filterPushedDown = filterPushedDown; + } + + private boolean isShardedCluster(MongoClient client) { + // need to check better way of identifying + List<String> databaseNames = client.getDatabaseNames(); + return databaseNames.contains(CONFIG); + } + + @SuppressWarnings("rawtypes") + private void init() throws IOException { + MongoClient client = null; + try { + MongoClientURI clientURI = new MongoClientURI( + this.storagePluginConfig.getConnection()); + client = new MongoClient(clientURI); + + chunksMapping = Maps.newHashMap(); + chunksInverseMapping = Maps.newLinkedHashMap(); + if (isShardedCluster(client)) { + DB db = client.getDB(CONFIG); + db.setReadPreference(ReadPreference.nearest()); + DBCollection chunksCollection = db.getCollectionFromString(CHUNKS); + + DBObject query = new BasicDBObject(1); + query + .put( + NS, + this.scanSpec.getDbName() + "." + + this.scanSpec.getCollectionName()); + + DBObject fields = new BasicDBObject(); + fields.put(SHARD, select); + fields.put(MIN, select); + fields.put(MAX, select); + + DBCursor chunkCursor = chunksCollection.find(query, fields); + + DBCollection shardsCollection = db.getCollectionFromString(SHARDS); + + fields = new BasicDBObject(); + fields.put(HOST, select); + + while (chunkCursor.hasNext()) { + DBObject chunkObj = chunkCursor.next(); + String shardName = (String) chunkObj.get(SHARD); + String chunkId = (String) chunkObj.get(ID); + query = new BasicDBObject().append(ID, shardName); + DBCursor hostCursor = shardsCollection.find(query, fields); + while (hostCursor.hasNext()) { + DBObject hostObj = hostCursor.next(); + String hostEntry = (String) hostObj.get(HOST); + String[] tagAndHost = StringUtils.split(hostEntry, '/'); + String[] hosts = tagAndHost.length > 1 ? StringUtils.split( + tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ','); + Set<ServerAddress> addressList = chunksMapping.get(chunkId); + if (addressList == null) { + addressList = Sets.newHashSet(); + chunksMapping.put(chunkId, addressList); + } + for (String host : hosts) { + addressList.add(new ServerAddress(host)); + } + ServerAddress address = addressList.iterator().next(); + + List<ChunkInfo> chunkList = chunksInverseMapping.get(address + .getHost()); + if (chunkList == null) { + chunkList = Lists.newArrayList(); + chunksInverseMapping.put(address.getHost(), chunkList); + } + ChunkInfo chunkInfo = new ChunkInfo(Arrays.asList(hosts), chunkId); + DBObject minObj = (BasicDBObject) chunkObj.get(MIN); + + Map<String, Object> minFilters = Maps.newHashMap(); + Map minMap = minObj.toMap(); + Set keySet = minMap.keySet(); + for (Object keyObj : keySet) { + Object object = minMap.get(keyObj); + if (!(object instanceof MinKey)) { + minFilters.put(keyObj.toString(), object); + } + } + chunkInfo.setMinFilters(minFilters); + + DBObject maxObj = (BasicDBObject) chunkObj.get(MAX); + Map<String, Object> maxFilters = Maps.newHashMap(); + Map maxMap = maxObj.toMap(); + keySet = maxMap.keySet(); + for (Object keyObj : keySet) { + Object object = maxMap.get(keyObj); + if (!(object instanceof MaxKey)) { + maxFilters.put(keyObj.toString(), object); + } + } + + chunkInfo.setMaxFilters(maxFilters); + chunkList.add(chunkInfo); + } + } + } else { + String chunkName = scanSpec.getDbName() + "." + + scanSpec.getCollectionName(); + List<String> hosts = clientURI.getHosts(); + Set<ServerAddress> addressList = Sets.newHashSet(); + + for (String host : hosts) { + addressList.add(new ServerAddress(host)); + } + chunksMapping.put(chunkName, addressList); + + String host = hosts.get(0); + ServerAddress address = new ServerAddress(host); + ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName); + chunkInfo.setMinFilters(Collections.<String, Object> emptyMap()); + chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap()); + List<ChunkInfo> chunksList = Lists.newArrayList(); + chunksList.add(chunkInfo); + chunksInverseMapping.put(address.getHost(), chunksList); + } + } catch (UnknownHostException e) { + throw new DrillRuntimeException(e.getMessage(), e); + } finally { + if (client != null) { + client.close(); + } + } + + } + + @Override + public GroupScan clone(List<SchemaPath> columns) { + MongoGroupScan clone = new MongoGroupScan(this); + clone.columns = columns; + return clone; + } + + @Override + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; + } + + @Override + public void applyAssignments(List<DrillbitEndpoint> endpoints) + throws PhysicalOperatorSetupException { + logger.debug("Incoming endpoints :" + endpoints); + watch.reset(); + watch.start(); + + final int numSlots = endpoints.size(); + int totalAssignmentsTobeDone = chunksMapping.size(); + + Preconditions.checkArgument(numSlots <= totalAssignmentsTobeDone, String + .format("Incoming endpoints %d is greater than number of chunks %d", + numSlots, totalAssignmentsTobeDone)); + + final int minPerEndpointSlot = (int) Math + .floor((double) totalAssignmentsTobeDone / numSlots); + final int maxPerEndpointSlot = (int) Math + .ceil((double) totalAssignmentsTobeDone / numSlots); + + endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots); + Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap(); + + for (int i = 0; i < numSlots; ++i) { + endpointFragmentMapping.put(i, new ArrayList<MongoSubScanSpec>( + maxPerEndpointSlot)); + String hostname = endpoints.get(i).getAddress(); + Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname); + if (hostIndexQueue == null) { + hostIndexQueue = Lists.newLinkedList(); + endpointHostIndexListMap.put(hostname, hostIndexQueue); + } + hostIndexQueue.add(i); + } + + Set<Entry<String, List<ChunkInfo>>> chunksToAssignSet = Sets + .newHashSet(chunksInverseMapping.entrySet()); + + for (Iterator<Entry<String, List<ChunkInfo>>> chunksIterator = chunksToAssignSet + .iterator(); chunksIterator.hasNext();) { + Entry<String, List<ChunkInfo>> chunkEntry = chunksIterator.next(); + Queue<Integer> slots = endpointHostIndexListMap.get(chunkEntry.getKey()); + if (slots != null) { + for (ChunkInfo chunkInfo : chunkEntry.getValue()) { + Integer slotIndex = slots.poll(); + List<MongoSubScanSpec> subScanSpecList = endpointFragmentMapping + .get(slotIndex); + subScanSpecList.add(buildSubScanSpecAndGet(chunkInfo)); + slots.offer(slotIndex); + } + chunksIterator.remove(); + } + } + + PriorityQueue<List<MongoSubScanSpec>> minHeap = new PriorityQueue<List<MongoSubScanSpec>>( + numSlots, LIST_SIZE_COMPARATOR); + PriorityQueue<List<MongoSubScanSpec>> maxHeap = new PriorityQueue<List<MongoSubScanSpec>>( + numSlots, LIST_SIZE_COMPARATOR_REV); + for (List<MongoSubScanSpec> listOfScan : endpointFragmentMapping.values()) { + if (listOfScan.size() < minPerEndpointSlot) { + minHeap.offer(listOfScan); + } else if (listOfScan.size() > minPerEndpointSlot) { + maxHeap.offer(listOfScan); + } + } + + if (chunksToAssignSet.size() > 0) { + for (Entry<String, List<ChunkInfo>> chunkEntry : chunksToAssignSet) { + for (ChunkInfo chunkInfo : chunkEntry.getValue()) { + List<MongoSubScanSpec> smallestList = minHeap.poll(); + smallestList.add(buildSubScanSpecAndGet(chunkInfo)); + minHeap.offer(smallestList); + } + } + } + + while (minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) { + List<MongoSubScanSpec> smallestList = minHeap.poll(); + List<MongoSubScanSpec> largestList = maxHeap.poll(); + smallestList.add(largestList.remove(largestList.size() - 1)); + if (largestList.size() > minPerEndpointSlot) { + maxHeap.offer(largestList); + } + if (smallestList.size() < minPerEndpointSlot) { + minHeap.offer(smallestList); + } + } + + logger.debug( + "Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}", + watch.elapsed(TimeUnit.NANOSECONDS) / 1000, endpoints, + endpointFragmentMapping.toString()); + } + + private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) { + MongoSubScanSpec subScanSpec = new MongoSubScanSpec() + .setDbName(scanSpec.getDbName()) + .setCollectionName(scanSpec.getCollectionName()) + .setHosts(chunkInfo.getChunkLocList()) + .setMinFilters(chunkInfo.getMinFilters()) + .setMaxFilters(chunkInfo.getMaxFilters()) + .setFilter(scanSpec.getFilters()); + return subScanSpec; + } + + @Override + public MongoSubScan getSpecificScan(int minorFragmentId) + throws ExecutionSetupException { + return new MongoSubScan(storagePlugin, storagePluginConfig, + endpointFragmentMapping.get(minorFragmentId), columns); + } + + @Override + public int getMaxParallelizationWidth() { + return chunksMapping.size(); + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public ScanStats getScanStats() { + MongoClientURI clientURI = new MongoClientURI( + this.storagePluginConfig.getConnection()); + try { + List<String> hosts = clientURI.getHosts(); + List<ServerAddress> addresses = Lists.newArrayList(); + for (String host : hosts) { + addresses.add(new ServerAddress(host)); + } + MongoClient client = MongoCnxnManager.getClient(addresses, + clientURI.getOptions()); + DB db = client.getDB(scanSpec.getDbName()); + db.setReadPreference(ReadPreference.nearest()); + DBCollection collection = db.getCollectionFromString(scanSpec + .getCollectionName()); + CommandResult stats = collection.getStats(); + return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, + stats.getLong(COUNT), 1, (float) stats.getDouble(SIZE)); + } catch (Exception e) { + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + return new MongoGroupScan(this); + } + + @Override + public List<EndpointAffinity> getOperatorAffinity() { + watch.reset(); + watch.start(); + + Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap(); + for (DrillbitEndpoint endpoint : storagePlugin.getContext().getBits()) { + endpointMap.put(endpoint.getAddress(), endpoint); + logger.debug("Endpoint address: {}", endpoint.getAddress()); + } + + Map<DrillbitEndpoint, EndpointAffinity> affinityMap = Maps.newHashMap(); + // As of now, considering only the first replica, though there may be + // multiple replicas for each chunk. + for (Set<ServerAddress> addressList : chunksMapping.values()) { + // Each replica can be on multiple machines, take the first one, which + // meets affinity. + for (ServerAddress address : addressList) { + DrillbitEndpoint ep = endpointMap.get(address.getHost()); + if (ep != null) { + EndpointAffinity affinity = affinityMap.get(ep); + if (affinity == null) { + affinityMap.put(ep, new EndpointAffinity(ep, 1)); + } else { + affinity.addAffinity(1); + } + break; + } + } + } + logger.debug("Took {} µs to get operator affinity", + watch.elapsed(TimeUnit.NANOSECONDS) / 1000); + logger.debug("Affined drillbits : " + affinityMap.values()); + return Lists.newArrayList(affinityMap.values()); + } + + @JsonProperty + public List<SchemaPath> getColumns() { + return columns; + } + + @JsonProperty("mongoScanSpec") + public MongoScanSpec getScanSpec() { + return scanSpec; + } + + @JsonProperty("storage") + public MongoStoragePluginConfig getStorageConfig() { + return storagePluginConfig; + } + + @JsonIgnore + public MongoStoragePlugin getStoragePlugin() { + return storagePlugin; + } + + @Override + public String toString() { + return "MongoGroupScan [MongoScanSpec=" + scanSpec + ", columns=" + columns + + "]"; + } + + @VisibleForTesting + MongoGroupScan() { + } + + @JsonIgnore + @VisibleForTesting + void setChunksMapping(Map<String, Set<ServerAddress>> chunksMapping) { + this.chunksMapping = chunksMapping; + } + + @JsonIgnore + @VisibleForTesting + void setScanSpec(MongoScanSpec scanSpec) { + this.scanSpec = scanSpec; + } + + @JsonIgnore + @VisibleForTesting + void setInverseChunsMapping(Map<String, List<ChunkInfo>> chunksInverseMapping) { + this.chunksInverseMapping = chunksInverseMapping; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java new file mode 100644 index 0000000..9af49b1 --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.FilterPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.rex.RexNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; + +public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule { + private static final Logger logger = LoggerFactory + .getLogger(MongoPushDownFilterForScan.class); + public static final StoragePluginOptimizerRule INSTANCE = new MongoPushDownFilterForScan(); + + private MongoPushDownFilterForScan() { + super( + RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), + "MongoPushDownFilterForScan"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + final RexNode condition = filter.getCondition(); + + MongoGroupScan groupScan = (MongoGroupScan) scan.getGroupScan(); + if (groupScan.isFilterPushedDown()) { + return; + } + + LogicalExpression conditionExp = DrillOptiq.toDrill( + new DrillParseContext(), scan, condition); + MongoFilterBuilder mongoFilterBuilder = new MongoFilterBuilder(groupScan, + conditionExp); + MongoScanSpec newScanSpec = mongoFilterBuilder.parseTree(); + if (newScanSpec == null) { + return; // no filter pushdown so nothing to apply. + } + + MongoGroupScan newGroupsScan = null; + try { + newGroupsScan = new MongoGroupScan(groupScan.getStoragePlugin(), + newScanSpec, groupScan.getColumns()); + } catch (IOException e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException(e.getMessage(), e); + } + newGroupsScan.setFilterPushedDown(true); + + final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), + newGroupsScan, scan.getRowType()); + if (mongoFilterBuilder.isAllExpressionsConverted()) { + /* + * Since we could convert the entire filter condition expression into an + * Mongo filter, we can eliminate the filter operator altogether. + */ + call.transformTo(newScanPrel); + } else { + call.transformTo(filter.copy(filter.getTraitSet(), + ImmutableList.of((RelNode) newScanPrel))); + } + + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + if (scan.getGroupScan() instanceof MongoGroupScan) { + return super.matches(call); + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java new file mode 100644 index 0000000..ad4e119 --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.PathSegment.NameSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.complex.fn.JsonReaderWithState; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.mongodb.BasicDBObject; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; + +public class MongoRecordReader extends AbstractRecordReader { + static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class); + + private static final int TARGET_RECORD_COUNT = 3000; + + private DBCollection collection; + private DBCursor cursor; + + private NullableVarCharVector valueVector; + + private JsonReaderWithState jsonReaderWithState; + private VectorContainerWriter writer; + private List<SchemaPath> columns; + + private BasicDBObject filters; + private DBObject fields; + + private MongoClientOptions clientOptions; + private FragmentContext fragmentContext; + private OperatorContext operatorContext; + + private Boolean enableAllTextMode; + + public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, + List<SchemaPath> projectedColumns, FragmentContext context, + MongoClientOptions clientOptions) { + this.clientOptions = clientOptions; + this.fields = new BasicDBObject(); + // exclude _id field, if not mentioned by user. + this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0)); + this.columns = projectedColumns; + setColumns(projectedColumns); + transformColumns(projectedColumns); + this.fragmentContext = context; + this.filters = new BasicDBObject(); + Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters( + subScanSpec.getMinFilters(), subScanSpec.getMaxFilters()); + buildFilters(subScanSpec.getFilter(), mergedFilters); + enableAllTextMode = fragmentContext.getDrillbitContext().getOptionManager() + .getOption(ExecConstants.MONGO_ALL_TEXT_MODE).bool_val; + init(subScanSpec); + } + + @Override + protected Collection<SchemaPath> transformColumns( + Collection<SchemaPath> projectedColumns) { + Set<SchemaPath> transformed = Sets.newLinkedHashSet(); + if (!isStarQuery()) { + Iterator<SchemaPath> columnIterator = projectedColumns.iterator(); + while (columnIterator.hasNext()) { + SchemaPath column = columnIterator.next(); + NameSegment root = column.getRootSegment(); + String fieldName = root.getPath(); + transformed.add(SchemaPath.getSimplePath(fieldName)); + this.fields.put(fieldName, Integer.valueOf(1)); + } + } + return transformed; + } + + private void buildFilters(BasicDBObject pushdownFilters, + Map<String, List<BasicDBObject>> mergedFilters) { + for (Entry<String, List<BasicDBObject>> entry : mergedFilters.entrySet()) { + List<BasicDBObject> list = entry.getValue(); + if (list.size() == 1) { + this.filters.putAll(list.get(0).toMap()); + } else { + BasicDBObject andQueryFilter = new BasicDBObject(); + andQueryFilter.put("$and", list); + this.filters.putAll(andQueryFilter.toMap()); + } + } + if (pushdownFilters != null && !pushdownFilters.toMap().isEmpty()) { + if (!mergedFilters.isEmpty()) { + this.filters = MongoUtils.andFilterAtIndex(this.filters, + pushdownFilters); + } else { + this.filters = pushdownFilters; + } + } + } + + private void init(MongoSubScan.MongoSubScanSpec subScanSpec) { + try { + List<String> hosts = subScanSpec.getHosts(); + List<ServerAddress> addresses = Lists.newArrayList(); + for (String host : hosts) { + addresses.add(new ServerAddress(host)); + } + MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions); + DB db = client.getDB(subScanSpec.getDbName()); + db.setReadPreference(ReadPreference.nearest()); + collection = db.getCollection(subScanSpec.getCollectionName()); + } catch (UnknownHostException e) { + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + if (isStarQuery()) { + try { + SchemaPath startColumn = SchemaPath.getSimplePath("*"); + MaterializedField field = MaterializedField.create(startColumn, + Types.optional(MinorType.VARCHAR)); + valueVector = output.addField(field, NullableVarCharVector.class); + } catch (SchemaChangeException e) { + throw new ExecutionSetupException(e); + } + } else { + try { + this.writer = new VectorContainerWriter(output); + this.jsonReaderWithState = new JsonReaderWithState( + fragmentContext.getManagedBuffer(), columns, enableAllTextMode); + } catch (IOException e) { + throw new ExecutionSetupException( + "Failure in Mongo JsonReader initialization.", e); + } + } + logger.info("Filters Applied : " + filters); + logger.info("Fields Selected :" + fields); + cursor = collection.find(filters, fields); + } + + private int handleNonStarQuery() { + writer.allocate(); + writer.reset(); + + int docCount = 0; + Stopwatch watch = new Stopwatch(); + watch.start(); + int rowCount = 0; + + try { + String errMsg = "Document {} is too big to fit into allocated ValueVector"; + done: for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) { + writer.setPosition(docCount); + String doc = cursor.next().toString(); + byte[] record = doc.getBytes(Charsets.UTF_8); + switch (jsonReaderWithState.write(record, writer)) { + case WRITE_SUCCEED: + docCount++; + break; + + case WRITE_FAILED: + if (docCount == 0) { + throw new DrillRuntimeException(errMsg); + } + logger.warn(errMsg, doc); + break done; + + default: + break done; + } + } + + for (SchemaPath sp : jsonReaderWithState.getNullColumns()) { + PathSegment root = sp.getRootSegment(); + BaseWriter.MapWriter fieldWriter = writer.rootAsMap(); + if (root.getChild() != null && !root.getChild().isArray()) { + fieldWriter = fieldWriter.map(root.getNameSegment().getPath()); + while (root.getChild().getChild() != null + && !root.getChild().isArray()) { + fieldWriter = fieldWriter.map(root.getChild().getNameSegment() + .getPath()); + root = root.getChild(); + } + fieldWriter.integer(root.getChild().getNameSegment().getPath()); + } else { + fieldWriter.integer(root.getNameSegment().getPath()); + } + } + + writer.setValueCount(docCount); + logger.debug("Took {} ms to get {} records", + watch.elapsed(TimeUnit.MILLISECONDS), rowCount); + return docCount; + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException("Failure while reading Mongo Record.", e); + } + } + + private int handleStarQuery() { + Stopwatch watch = new Stopwatch(); + watch.start(); + int rowCount = 0; + + if (valueVector == null) { + throw new DrillRuntimeException("Value vector is not initialized!!!"); + } + valueVector.clear(); + valueVector + .allocateNew(4 * 1024 * TARGET_RECORD_COUNT, TARGET_RECORD_COUNT); + + String errMsg = "Document {} is too big to fit into allocated ValueVector"; + + try { + for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) { + String doc = cursor.next().toString(); + byte[] record = doc.getBytes(Charsets.UTF_8); + if (!valueVector.getMutator().setSafe(rowCount, record, 0, + record.length)) { + logger.warn(errMsg, doc); + if (rowCount == 0) { + break; + } + } + } + valueVector.getMutator().setValueCount(rowCount); + logger.debug("Took {} ms to get {} records", + watch.elapsed(TimeUnit.MILLISECONDS), rowCount); + return rowCount; + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException("Failure while reading Mongo Record.", e); + } + } + + @Override + public int next() { + return isStarQuery() ? handleStarQuery() : handleNonStarQuery(); + } + + @Override + public void cleanup() { + } + + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java new file mode 100644 index 0000000..8e5fd7d --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.mongodb.MongoClientOptions; + +public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> { + static final Logger logger = LoggerFactory + .getLogger(MongoScanBatchCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, MongoSubScan subScan, + List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + List<RecordReader> readers = Lists.newArrayList(); + List<SchemaPath> columns = null; + for (MongoSubScan.MongoSubScanSpec scanSpec : subScan + .getChunkScanSpecList()) { + try { + if ((columns = subScan.getColumns()) == null) { + columns = GroupScan.ALL_COLUMNS; + } + MongoClientOptions clientOptions = subScan.getMongoPluginConfig() + .getMongoOptions(); + readers.add(new MongoRecordReader(scanSpec, columns, context, + clientOptions)); + } catch (Exception e) { + logger.error("MongoRecordReader creation failed for subScan: " + + subScan + "."); + logger.error(e.getMessage(), e); + throw new ExecutionSetupException(e); + } + } + logger.info("Number of record readers initialized : " + readers.size()); + return new ScanBatch(subScan, context, readers.iterator()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java new file mode 100644 index 0000000..d380207 --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanSpec.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.mongodb.BasicDBObject; + +public class MongoScanSpec { + private String dbName; + private String collectionName; + + private BasicDBObject filters; + + @JsonCreator + public MongoScanSpec(@JsonProperty("dbName") String dbName, + @JsonProperty("collectionName") String collectionName) { + this.dbName = dbName; + this.collectionName = collectionName; + } + + public MongoScanSpec(String dbName, String collectionName, + BasicDBObject filters) { + this.dbName = dbName; + this.collectionName = collectionName; + this.filters = filters; + } + + public String getDbName() { + return dbName; + } + + public String getCollectionName() { + return collectionName; + } + + public BasicDBObject getFilters() { + return filters; + } + + @Override + public String toString() { + return "MongoScanSpec [dbName=" + dbName + ", collectionName=" + + collectionName + ", filters=" + filters + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java new file mode 100644 index 0000000..e46d8ec --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.io.IOException; +import java.util.Set; + +import net.hydromatic.optiq.SchemaPlus; + +import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; + +public class MongoStoragePlugin extends AbstractStoragePlugin { + static final Logger logger = LoggerFactory + .getLogger(MongoStoragePlugin.class); + + private DrillbitContext context; + private MongoStoragePluginConfig mongoConfig; + private MongoSchemaFactory schemaFactory; + + public MongoStoragePlugin(MongoStoragePluginConfig mongoConfig, + DrillbitContext context, String name) throws IOException, + ExecutionSetupException { + this.context = context; + this.mongoConfig = mongoConfig; + this.schemaFactory = new MongoSchemaFactory(this, name); + } + + public DrillbitContext getContext() { + return this.context; + } + + @Override + public MongoStoragePluginConfig getConfig() { + return mongoConfig; + } + + @Override + public void registerSchemas(UserSession session, SchemaPlus parent) { + schemaFactory.registerSchemas(session, parent); + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public AbstractGroupScan getPhysicalScan(JSONOptions selection) + throws IOException { + MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), + new TypeReference<MongoScanSpec>() { + }); + return new MongoGroupScan(this, mongoScanSpec, null); + } + + public Set<StoragePluginOptimizerRule> getOptimizerRules() { + return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java new file mode 100644 index 0000000..b7cbf24 --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePluginConfig.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import org.apache.drill.common.logical.StoragePluginConfig; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientURI; + +@JsonTypeName(MongoStoragePluginConfig.NAME) +public class MongoStoragePluginConfig extends StoragePluginConfig { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(MongoStoragePluginConfig.class); + + public static final String NAME = "mongo"; + + private String connection; + + @JsonIgnore + private MongoClientURI clientURI; + + @JsonCreator + public MongoStoragePluginConfig(@JsonProperty("connection") String connection) { + this.connection = connection; + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } else if (that == null || getClass() != that.getClass()) { + return false; + } + MongoStoragePluginConfig thatConfig = (MongoStoragePluginConfig) that; + return this.connection.equals(thatConfig.connection); + + } + + @Override + public int hashCode() { + return this.connection != null ? this.connection.hashCode() : 0; + } + + @JsonIgnore + public MongoClientOptions getMongoOptions() { + MongoClientURI clientURI = new MongoClientURI(connection); + return clientURI.getOptions(); + } + + public String getConnection() { + return connection; + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2ca9c907/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java new file mode 100644 index 0000000..36008cf --- /dev/null +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoSubScan.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mongo; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.mongodb.BasicDBObject; + +@JsonTypeName("mongo-shard-read") +public class MongoSubScan extends AbstractBase implements SubScan { + static final Logger logger = LoggerFactory.getLogger(MongoSubScan.class); + + @JsonProperty + private final MongoStoragePluginConfig mongoPluginConfig; + @JsonIgnore + private final MongoStoragePlugin mongoStoragePlugin; + private final List<SchemaPath> columns; + + private final List<MongoSubScanSpec> chunkScanSpecList; + + @JsonCreator + public MongoSubScan( + @JacksonInject StoragePluginRegistry registry, + @JsonProperty("mongoPluginConfig") StoragePluginConfig mongoPluginConfig, + @JsonProperty("chunkScanSpecList") LinkedList<MongoSubScanSpec> chunkScanSpecList, + @JsonProperty("columns") List<SchemaPath> columns) + throws ExecutionSetupException { + this.columns = columns; + this.mongoPluginConfig = (MongoStoragePluginConfig) mongoPluginConfig; + this.mongoStoragePlugin = (MongoStoragePlugin) registry + .getPlugin(mongoPluginConfig); + this.chunkScanSpecList = chunkScanSpecList; + } + + public MongoSubScan(MongoStoragePlugin storagePlugin, + MongoStoragePluginConfig storagePluginConfig, + List<MongoSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) { + this.mongoStoragePlugin = storagePlugin; + this.mongoPluginConfig = storagePluginConfig; + this.columns = columns; + this.chunkScanSpecList = chunkScanSpecList; + } + + @Override + public <T, X, E extends Throwable> T accept( + PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + @JsonIgnore + public MongoStoragePluginConfig getMongoPluginConfig() { + return mongoPluginConfig; + } + + @JsonIgnore + public MongoStoragePlugin getMongoStoragePlugin() { + return mongoStoragePlugin; + } + + public List<SchemaPath> getColumns() { + return columns; + } + + public List<MongoSubScanSpec> getChunkScanSpecList() { + return chunkScanSpecList; + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + return new MongoSubScan(mongoStoragePlugin, mongoPluginConfig, + chunkScanSpecList, columns); + } + + @Override + public int getOperatorType() { + return 1009; + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return Iterators.emptyIterator(); + } + + public static class MongoSubScanSpec { + + protected String dbName; + protected String collectionName; + protected List<String> hosts; + protected Map<String, Object> minFilters; + protected Map<String, Object> maxFilters; + + protected BasicDBObject filter; + + @parquet.org.codehaus.jackson.annotate.JsonCreator + public MongoSubScanSpec(@JsonProperty("dbName") String dbName, + @JsonProperty("collectionName") String collectionName, + @JsonProperty("hosts") List<String> hosts, + @JsonProperty("minFilters") Map<String, Object> minFilters, + @JsonProperty("maxFilters") Map<String, Object> maxFilters, + @JsonProperty("filters") BasicDBObject filters) { + this.dbName = dbName; + this.collectionName = collectionName; + this.hosts = hosts; + this.minFilters = minFilters; + this.maxFilters = maxFilters; + this.filter = filters; + } + + MongoSubScanSpec() { + + } + + public String getDbName() { + return dbName; + } + + public MongoSubScanSpec setDbName(String dbName) { + this.dbName = dbName; + return this; + } + + public String getCollectionName() { + return collectionName; + } + + public MongoSubScanSpec setCollectionName(String collectionName) { + this.collectionName = collectionName; + return this; + } + + public List<String> getHosts() { + return hosts; + } + + public MongoSubScanSpec setHosts(List<String> hosts) { + this.hosts = hosts; + return this; + } + + public Map<String, Object> getMinFilters() { + return minFilters; + } + + public MongoSubScanSpec setMinFilters(Map<String, Object> minFilters) { + this.minFilters = minFilters; + return this; + } + + public Map<String, Object> getMaxFilters() { + return maxFilters; + } + + public MongoSubScanSpec setMaxFilters(Map<String, Object> maxFilters) { + this.maxFilters = maxFilters; + return this; + } + + public BasicDBObject getFilter() { + return filter; + } + + public MongoSubScanSpec setFilter(BasicDBObject filter) { + this.filter = filter; + return this; + } + + @Override + public String toString() { + return "MongoSubScanSpec [dbName=" + dbName + ", collectionName=" + + collectionName + ", hosts=" + hosts + ", minFilters=" + minFilters + + ", maxFilters=" + maxFilters + ", filter=" + filter + "]"; + } + + } + +}