[CALCITE-1697] Update Mongo driver version to 3.5.0 (Vladimir Dolzhenko) Close apache/calcite#588
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/b88bd70a Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/b88bd70a Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/b88bd70a Branch: refs/heads/master Commit: b88bd70a95d5d1b9566141cd38aae5507c2bcea2 Parents: eac017c Author: Vladimir Dolzhenko <[email protected]> Authored: Thu Dec 21 17:29:21 2017 +0100 Committer: Julian Hyde <[email protected]> Committed: Tue Jan 2 18:42:29 2018 -0800 ---------------------------------------------------------------------- .../adapter/mongodb/MongoEnumerator.java | 39 +++---- .../calcite/adapter/mongodb/MongoSchema.java | 20 ++-- .../adapter/mongodb/MongoSchemaFactory.java | 51 ++++++++- .../calcite/adapter/mongodb/MongoTable.java | 106 +++++-------------- pom.xml | 2 +- 5 files changed, 105 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/b88bd70a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoEnumerator.java ---------------------------------------------------------------------- diff --git a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoEnumerator.java b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoEnumerator.java index 1294c58..bdc080c 100644 --- a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoEnumerator.java +++ b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoEnumerator.java @@ -21,8 +21,9 @@ import org.apache.calcite.linq4j.Enumerator; import org.apache.calcite.linq4j.function.Function1; import org.apache.calcite.linq4j.tree.Primitive; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; +import com.mongodb.client.MongoCursor; + +import org.bson.Document; import java.util.Date; import java.util.Iterator; @@ -31,8 +32,8 @@ import java.util.Map; /** Enumerator that reads from a MongoDB collection. */ class MongoEnumerator implements Enumerator<Object> { - private final Iterator<DBObject> cursor; - private final Function1<DBObject, Object> getter; + private final Iterator<Document> cursor; + private final Function1<Document, Object> getter; private Object current; /** Creates a MongoEnumerator. @@ -40,8 +41,8 @@ class MongoEnumerator implements Enumerator<Object> { * @param cursor Mongo iterator (usually a {@link com.mongodb.DBCursor}) * @param getter Converts an object into a list of fields */ - MongoEnumerator(Iterator<DBObject> cursor, - Function1<DBObject, Object> getter) { + MongoEnumerator(Iterator<Document> cursor, + Function1<Document, Object> getter) { this.cursor = cursor; this.getter = getter; } @@ -53,7 +54,7 @@ class MongoEnumerator implements Enumerator<Object> { public boolean moveNext() { try { if (cursor.hasNext()) { - DBObject map = cursor.next(); + Document map = cursor.next(); current = getter.apply(map); return true; } else { @@ -70,25 +71,25 @@ class MongoEnumerator implements Enumerator<Object> { } public void close() { - if (cursor instanceof DBCursor) { - ((DBCursor) cursor).close(); + if (cursor instanceof MongoCursor) { + ((MongoCursor) cursor).close(); } // AggregationOutput implements Iterator but not DBCursor. There is no // available close() method -- apparently there is no open resource. } - static Function1<DBObject, Map> mapGetter() { - return new Function1<DBObject, Map>() { - public Map apply(DBObject a0) { + static Function1<Document, Map> mapGetter() { + return new Function1<Document, Map>() { + public Map apply(Document a0) { return (Map) a0; } }; } - static Function1<DBObject, Object> singletonGetter(final String fieldName, + static Function1<Document, Object> singletonGetter(final String fieldName, final Class fieldClass) { - return new Function1<DBObject, Object>() { - public Object apply(DBObject a0) { + return new Function1<Document, Object>() { + public Object apply(Document a0) { return convert(a0.get(fieldName), fieldClass); } }; @@ -97,10 +98,10 @@ class MongoEnumerator implements Enumerator<Object> { /** * @param fields List of fields to project; or null to return map */ - static Function1<DBObject, Object[]> listGetter( + static Function1<Document, Object[]> listGetter( final List<Map.Entry<String, Class>> fields) { - return new Function1<DBObject, Object[]>() { - public Object[] apply(DBObject a0) { + return new Function1<Document, Object[]>() { + public Object[] apply(Document a0) { Object[] objects = new Object[fields.size()]; for (int i = 0; i < fields.size(); i++) { final Map.Entry<String, Class> field = fields.get(i); @@ -112,7 +113,7 @@ class MongoEnumerator implements Enumerator<Object> { }; } - static Function1<DBObject, Object> getter( + static Function1<Document, Object> getter( List<Map.Entry<String, Class>> fields) { //noinspection unchecked return fields == null http://git-wip-us.apache.org/repos/asf/calcite/blob/b88bd70a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchema.java ---------------------------------------------------------------------- diff --git a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchema.java b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchema.java index 5bfc3f6..40bdec5 100644 --- a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchema.java +++ b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchema.java @@ -20,9 +20,15 @@ import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import com.google.common.collect.ImmutableMap; -import com.mongodb.DB; + import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import com.mongodb.client.MongoDatabase; +import java.util.List; import java.util.Map; /** @@ -30,7 +36,7 @@ import java.util.Map; * is a MONGO file in that directory. */ public class MongoSchema extends AbstractSchema { - final DB mongoDb; + final MongoDatabase mongoDb; /** * Creates a MongoDB schema. @@ -38,11 +44,13 @@ public class MongoSchema extends AbstractSchema { * @param host Mongo host, e.g. "localhost" * @param database Mongo database name, e.g. "foodmart" */ - public MongoSchema(String host, String database) { + public MongoSchema(String host, String database, + List<MongoCredential> credentialsList, MongoClientOptions options) { super(); try { - MongoClient mongo = new MongoClient(host); - this.mongoDb = mongo.getDB(database); + final MongoClient mongo = + new MongoClient(new ServerAddress(host), credentialsList, options); + this.mongoDb = mongo.getDatabase(database); } catch (Exception e) { throw new RuntimeException(e); } @@ -50,7 +58,7 @@ public class MongoSchema extends AbstractSchema { @Override protected Map<String, Table> getTableMap() { final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); - for (String collectionName : mongoDb.getCollectionNames()) { + for (String collectionName : mongoDb.listCollectionNames()) { builder.put(collectionName, new MongoTable(collectionName)); } return builder.build(); http://git-wip-us.apache.org/repos/asf/calcite/blob/b88bd70a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchemaFactory.java ---------------------------------------------------------------------- diff --git a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchemaFactory.java b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchemaFactory.java index 46ceddb..1d8ce74 100644 --- a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchemaFactory.java +++ b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoSchemaFactory.java @@ -20,6 +20,12 @@ import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaFactory; import org.apache.calcite.schema.SchemaPlus; +import com.mongodb.AuthenticationMechanism; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoCredential; + +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -27,7 +33,6 @@ import java.util.Map; * * <p>Allows a custom schema to be included in a model.json file.</p> */ -@SuppressWarnings("UnusedDeclaration") public class MongoSchemaFactory implements SchemaFactory { // public constructor, per factory contract public MongoSchemaFactory() { @@ -35,10 +40,46 @@ public class MongoSchemaFactory implements SchemaFactory { public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { - Map map = (Map) operand; - String host = (String) map.get("host"); - String database = (String) map.get("database"); - return new MongoSchema(host, database); + final String host = (String) operand.get("host"); + final String database = (String) operand.get("database"); + final String authMechanismName = (String) operand.get("authMechanism"); + + final MongoClientOptions.Builder options = MongoClientOptions.builder(); + + final List<MongoCredential> credentials = new ArrayList<>(); + if (authMechanismName != null) { + final MongoCredential credential = createCredentials(operand); + credentials.add(credential); + } + + return new MongoSchema(host, database, credentials, options.build()); + } + + private MongoCredential createCredentials(Map<String, Object> map) { + final String authMechanismName = (String) map.get("authMechanism"); + final AuthenticationMechanism authenticationMechanism = + AuthenticationMechanism.fromMechanismName(authMechanismName); + final String username = (String) map.get("username"); + final String authDatabase = (String) map.get("authDatabase"); + final String password = (String) map.get("password"); + + switch (authenticationMechanism) { + case PLAIN: + return MongoCredential.createPlainCredential(username, authDatabase, + password.toCharArray()); + case SCRAM_SHA_1: + return MongoCredential.createScramSha1Credential(username, authDatabase, + password.toCharArray()); + case GSSAPI: + return MongoCredential.createGSSAPICredential(username); + case MONGODB_CR: + return MongoCredential.createMongoCRCredential(username, authDatabase, + password.toCharArray()); + case MONGODB_X509: + return MongoCredential.createMongoX509Credential(username); + } + throw new IllegalArgumentException("Unsupported authentication mechanism " + + authMechanismName); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/b88bd70a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoTable.java ---------------------------------------------------------------------- diff --git a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoTable.java b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoTable.java index fde5927..f00b5b4 100644 --- a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoTable.java +++ b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoTable.java @@ -34,24 +34,19 @@ import org.apache.calcite.schema.impl.AbstractTableQueryable; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Util; -import com.google.common.collect.Lists; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; -import com.mongodb.AggregationOptions; -import com.mongodb.AggregationOutput; -import com.mongodb.BasicDBList; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.util.JSON; +import org.bson.BsonDocument; +import org.bson.Document; +import org.bson.conversions.Bson; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import static org.apache.calcite.util.Static.cons; - /** * Table based on a MongoDB collection. */ @@ -102,19 +97,20 @@ public class MongoTable extends AbstractQueryableTable * @param fields List of fields to project; or null to return map * @return Enumerator of results */ - private Enumerable<Object> find(DB mongoDb, String filterJson, + private Enumerable<Object> find(MongoDatabase mongoDb, String filterJson, String projectJson, List<Map.Entry<String, Class>> fields) { - final DBCollection collection = + final MongoCollection collection = mongoDb.getCollection(collectionName); - final DBObject filter = - filterJson == null ? null : (DBObject) JSON.parse(filterJson); - final DBObject project = - projectJson == null ? null : (DBObject) JSON.parse(projectJson); - final Function1<DBObject, Object> getter = MongoEnumerator.getter(fields); + final Bson filter = + filterJson == null ? null : BsonDocument.parse(filterJson); + final Bson project = + projectJson == null ? null : BsonDocument.parse(projectJson); + final Function1<Document, Object> getter = MongoEnumerator.getter(fields); return new AbstractEnumerable<Object>() { public Enumerator<Object> enumerator() { - final DBCursor cursor = collection.find(filter, project); - return new MongoEnumerator(cursor, getter); + @SuppressWarnings("unchecked") final FindIterable<Document> cursor = + collection.find(filter).projection(project); + return new MongoEnumerator(cursor.iterator(), getter); } }; } @@ -132,57 +128,21 @@ public class MongoTable extends AbstractQueryableTable * @param operations One or more JSON strings * @return Enumerator of results */ - private Enumerable<Object> aggregate(final DB mongoDb, + private Enumerable<Object> aggregate(final MongoDatabase mongoDb, final List<Map.Entry<String, Class>> fields, final List<String> operations) { - final List<DBObject> list = new ArrayList<>(); - final BasicDBList versionArray = (BasicDBList) mongoDb - .command("buildInfo").get("versionArray"); - final Integer versionMajor = parseIntString(versionArray - .get(0).toString()); - final Integer versionMinor = parseIntString(versionArray - .get(1).toString()); -// final Integer versionMaintenance = parseIntString(versionArray -// .get(2).toString()); -// final Integer versionBuild = parseIntString(versionArray -// .get(3).toString()); - + final List<Bson> list = new ArrayList<>(); for (String operation : operations) { - list.add((DBObject) JSON.parse(operation)); + list.add(BsonDocument.parse(operation)); } - final DBObject first = list.get(0); - final List<DBObject> rest = Util.skip(list); - final Function1<DBObject, Object> getter = + final Function1<Document, Object> getter = MongoEnumerator.getter(fields); return new AbstractEnumerable<Object>() { public Enumerator<Object> enumerator() { - final Iterator<DBObject> resultIterator; + final Iterator<Document> resultIterator; try { - // Changed in version 2.6: The db.collection.aggregate() method - // returns a cursor - // and can return result sets of any size. - // See: http://docs.mongodb.org/manual/core/aggregation-pipeline - if (versionMajor > 1) { - // MongoDB version 2.6+ - if (versionMinor > 5) { - AggregationOptions options = AggregationOptions.builder() - .outputMode(AggregationOptions.OutputMode.CURSOR).build(); - // Warning - this can result in a very large ArrayList! - // but you should know your data and aggregate accordingly - final List<DBObject> resultAsArrayList = - Lists.newArrayList(mongoDb.getCollection(collectionName) - .aggregate(list, options)); - resultIterator = resultAsArrayList.iterator(); - } else { // Pre MongoDB version 2.6 - AggregationOutput result = aggregateOldWay(mongoDb - .getCollection(collectionName), first, rest); - resultIterator = result.results().iterator(); - } - } else { // Pre MongoDB version 2 - AggregationOutput result = aggregateOldWay(mongoDb - .getCollection(collectionName), first, rest); - resultIterator = result.results().iterator(); - } + resultIterator = mongoDb.getCollection(collectionName) + .aggregate(list).iterator(); } catch (Exception e) { throw new RuntimeException("While running MongoDB query " + Util.toString(operations, "[", ",\n", "]"), e); @@ -200,24 +160,6 @@ public class MongoTable extends AbstractQueryableTable return Integer.parseInt(valueString.replaceAll("[^0-9]", "")); } - /** Executes an "aggregate" operation for pre-2.6 mongo servers. - * - * <p>Return document is limited to 4M or 16M in size depending on - * version of mongo. - - * <p>Helper method for - * {@link org.apache.calcite.adapter.mongodb.MongoTable#aggregate}. - * - * @param dbCollection Collection - * @param first First aggregate action - * @param rest Rest of the aggregate actions - * @return Aggregation output - */ - private AggregationOutput aggregateOldWay(DBCollection dbCollection, - DBObject first, List<DBObject> rest) { - return dbCollection.aggregate(cons(first, rest)); - } - /** Implementation of {@link org.apache.calcite.linq4j.Queryable} based on * a {@link org.apache.calcite.adapter.mongodb.MongoTable}. * @@ -235,7 +177,7 @@ public class MongoTable extends AbstractQueryableTable return enumerable.enumerator(); } - private DB getMongoDb() { + private MongoDatabase getMongoDb() { return schema.unwrap(MongoSchema.class).mongoDb; } http://git-wip-us.apache.org/repos/asf/calcite/blob/b88bd70a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d7444cc..d4269b0 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,7 @@ limitations under the License. <!-- Apache 18 has 3.0.0, but need 3.0.1 for [MSOURCES-94]. --> <maven-source-plugin.version>3.0.1</maven-source-plugin.version> <mockito.version>2.5.5</mockito.version> - <mongo-java-driver.version>2.12.3</mongo-java-driver.version> + <mongo-java-driver.version>3.5.0</mongo-java-driver.version> <mysql-driver.version>5.1.20</mysql-driver.version> <natty.version>0.13</natty.version> <opencsv.version>2.3</opencsv.version>
