DRILL-3177: Part 1, upgrading Mongo Java Driver to 3.0.1
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5bb75b2d Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5bb75b2d Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5bb75b2d Branch: refs/heads/master Commit: 5bb75b2d1fc23586aff40ebfd93a7ce2084d01ad Parents: c7c2236 Author: Kamesh <[email protected]> Authored: Sat May 23 07:38:26 2015 +0530 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jun 25 19:58:47 2015 -0700 ---------------------------------------------------------------------- contrib/storage-mongo/pom.xml | 2 +- .../drill/exec/store/mongo/MongoGroupScan.java | 76 ++++++++++---------- .../exec/store/mongo/MongoRecordReader.java | 21 +++--- .../drill/exec/store/mongo/MongoUtils.java | 12 ---- .../store/mongo/schema/MongoSchemaFactory.java | 21 +++--- 5 files changed, 62 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml index 8168861..ce4ad7e 100644 --- a/contrib/storage-mongo/pom.xml +++ b/contrib/storage-mongo/pom.xml @@ -38,7 +38,7 @@ <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> - <version>2.12.2</version> + <version>3.0.1</version> <scope>compile</scope> </dependency> http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java index e33d2ae..b7885d3 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java @@ -47,6 +47,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.mongo.MongoSubScan.MongoSubScanSpec; import org.apache.drill.exec.store.mongo.common.ChunkInfo; +import org.bson.Document; import org.bson.types.MaxKey; import org.bson.types.MinKey; import org.slf4j.Logger; @@ -65,15 +66,15 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.mongodb.BasicDBObject; -import com.mongodb.CommandResult; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; @JsonTypeName("mongo-scan") public class MongoGroupScan extends AbstractGroupScan implements @@ -165,12 +166,12 @@ public class MongoGroupScan extends AbstractGroupScan implements } private boolean isShardedCluster(MongoClient client) { - DB db = client.getDB(scanSpec.getDbName()); - String msg = db.command("isMaster").getString("msg"); + MongoDatabase db = client.getDatabase(scanSpec.getDbName()); + String msg = db.runCommand(new Document("isMaster", 1)).getString("msg"); return msg == null ? false : msg.equals("isdbgrid"); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({ "rawtypes" }) private void init() throws IOException { try { List<String> h = storagePluginConfig.getHosts(); @@ -184,35 +185,37 @@ public class MongoGroupScan extends AbstractGroupScan implements chunksMapping = Maps.newHashMap(); chunksInverseMapping = Maps.newLinkedHashMap(); if (isShardedCluster(client)) { - DB db = client.getDB(CONFIG); - DBCollection chunksCollection = db.getCollectionFromString(CHUNKS); - DBObject query = new BasicDBObject(1); - query + MongoDatabase db = client.getDatabase(CONFIG); + MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS); + Document filter = new Document(); + filter .put( NS, this.scanSpec.getDbName() + "." + this.scanSpec.getCollectionName()); - DBObject fields = new BasicDBObject(); - fields.put(SHARD, select); - fields.put(MIN, select); - fields.put(MAX, select); + Document projection = new Document(); + projection.put(SHARD, select); + projection.put(MIN, select); + projection.put(MAX, select); - DBCursor chunkCursor = chunksCollection.find(query, fields); + FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection); + MongoCursor<Document> iterator = chunkCursor.iterator(); - DBCollection shardsCollection = db.getCollectionFromString(SHARDS); + MongoCollection<Document> shardsCollection = db.getCollection(SHARDS); - fields = new BasicDBObject(); - fields.put(HOST, select); + projection = new Document(); + projection.put(HOST, select); - while (chunkCursor.hasNext()) { - DBObject chunkObj = chunkCursor.next(); + while (iterator.hasNext()) { + Document chunkObj = iterator.next(); String shardName = (String) chunkObj.get(SHARD); String chunkId = (String) chunkObj.get(ID); - query = new BasicDBObject().append(ID, shardName); - DBCursor hostCursor = shardsCollection.find(query, fields); - while (hostCursor.hasNext()) { - DBObject hostObj = hostCursor.next(); + filter = new Document(ID, shardName); + FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection); + MongoCursor<Document> hostIterator = hostCursor.iterator(); + while (hostIterator.hasNext()) { + Document hostObj = hostIterator.next(); String hostEntry = (String) hostObj.get(HOST); String[] tagAndHost = StringUtils.split(hostEntry, '/'); String[] hosts = tagAndHost.length > 1 ? StringUtils.split( @@ -300,27 +303,27 @@ public class MongoGroupScan extends AbstractGroupScan implements private Set<ServerAddress> getPreferredHosts(MongoClient client, List<String> hosts) throws UnknownHostException { Set<ServerAddress> addressList = Sets.newHashSet(); - DB db = client.getDB(scanSpec.getDbName()); + MongoDatabase db = client.getDatabase(scanSpec.getDbName()); ReadPreference readPreference = client.getReadPreference(); + Document command = db.runCommand(new Document("isMaster", 1)); + switch (readPreference.getName().toUpperCase()) { case "PRIMARY": case "PRIMARYPREFERRED": - String primaryHost = db.command("isMaster").getString("primary"); + String primaryHost = command.getString("primary"); addressList.add(new ServerAddress(primaryHost)); return addressList; case "SECONDARY": case "SECONDARYPREFERRED": - primaryHost = db.command("isMaster").getString("primary"); - @SuppressWarnings("unchecked") - List<String> hostsList = (List<String>) db.command("isMaster").get( - "hosts"); + primaryHost = command.getString("primary"); + List<String> hostsList = (List<String>) command.get("hosts"); hostsList.remove(primaryHost); for (String host : hostsList) { addressList.add(new ServerAddress(host)); } return addressList; case "NEAREST": - hostsList = (List<String>) db.command("isMaster").get("hosts"); + hostsList = (List<String>) command.get("hosts"); for (String host : hostsList) { addressList.add(new ServerAddress(host)); } @@ -475,12 +478,13 @@ public class MongoGroupScan extends AbstractGroupScan implements } MongoClient client = MongoCnxnManager.getClient(addresses, clientURI.getOptions(), clientURI.getCredentials()); - DB db = client.getDB(scanSpec.getDbName()); - DBCollection collection = db.getCollectionFromString(scanSpec + MongoDatabase db = client.getDatabase(scanSpec.getDbName()); + MongoCollection<Document> collection = db.getCollection(scanSpec .getCollectionName()); - CommandResult stats = collection.getStats(); + String json = collection.find().first().toJson(); + float approxDiskCost = json.getBytes().length * collection.count(); return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, - stats.getLong(COUNT), 1, (float) stats.getDouble(SIZE)); + collection.count(), 1, approxDiskCost); } catch (Exception e) { throw new DrillRuntimeException(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java index 182f5a4..40fc810 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.complex.fn.JsonReader; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,27 +46,25 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.mongodb.BasicDBObject; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoCredential; -import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; public class MongoRecordReader extends AbstractRecordReader { static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class); - private DBCollection collection; - private DBCursor cursor; + private MongoCollection<Document> collection; + private MongoCursor<Document> cursor; private JsonReader jsonReader; private VectorContainerWriter writer; private BasicDBObject filters; - private DBObject fields; + private BasicDBObject fields; private MongoClientOptions clientOptions; private MongoCredential credential; @@ -141,7 +140,7 @@ public class MongoRecordReader extends AbstractRecordReader { } MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions, credential); - DB db = client.getDB(subScanSpec.getDbName()); + MongoDatabase db = client.getDatabase(subScanSpec.getDbName()); collection = db.getCollection(subScanSpec.getCollectionName()); } catch (UnknownHostException e) { throw new DrillRuntimeException(e.getMessage(), e); @@ -155,7 +154,7 @@ public class MongoRecordReader extends AbstractRecordReader { this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble); logger.info("Filters Applied : " + filters); logger.info("Fields Selected :" + fields); - cursor = collection.find(filters, fields); + cursor = collection.find(filters).projection(fields).iterator(); } @Override @@ -170,7 +169,7 @@ public class MongoRecordReader extends AbstractRecordReader { try { while (docCount < BaseValueVector.INITIAL_VALUE_ALLOCATION && cursor.hasNext()) { writer.setPosition(docCount); - String doc = cursor.next().toString(); + String doc = cursor.next().toJson(); jsonReader.setSource(doc.getBytes(Charsets.UTF_8)); jsonReader.write(writer); docCount++; http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java index b43a22f..a5fb2ad 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoUtils.java @@ -22,13 +22,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.bson.LazyBSONCallback; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; -import com.mongodb.LazyWriteableDBObject; public class MongoUtils { @@ -52,14 +48,6 @@ public class MongoUtils { return orQueryFilter; } - public static BasicDBObject deserializeFilter(byte[] filterBytes) { - DBObject dbo = new LazyWriteableDBObject(filterBytes, - new LazyBSONCallback()); - BasicDBObject result = new BasicDBObject(); - result.putAll(dbo); - return result; - } - public static Map<String, List<BasicDBObject>> mergeFilters( Map<String, Object> minFilters, Map<String, Object> maxFilters) { Map<String, List<BasicDBObject>> filters = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/drill/blob/5bb75b2d/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java index fccffb5..d453fb9 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java @@ -27,10 +27,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Maps; -import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; - import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.planner.logical.DrillTable; @@ -50,13 +47,13 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.mongodb.DB; import com.mongodb.MongoClientOptions; import com.mongodb.MongoCredential; import com.mongodb.MongoException; -import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; +import com.mongodb.client.MongoDatabase; public class MongoSchemaFactory implements SchemaFactory { @@ -106,8 +103,10 @@ public class MongoSchemaFactory implements SchemaFactory { throw new UnsupportedOperationException(); } try { - return MongoCnxnManager.getClient(addresses, options, credential) - .getDatabaseNames(); + List<String> dbNames = new ArrayList<>(); + MongoCnxnManager.getClient(addresses, options, credential) + .listDatabaseNames().into(dbNames); + return dbNames; } catch (MongoException me) { logger.warn("Failure while loading databases in Mongo. {}", me.getMessage()); @@ -124,9 +123,11 @@ public class MongoSchemaFactory implements SchemaFactory { @Override public List<String> load(String dbName) throws Exception { try { - DB db = MongoCnxnManager.getClient(addresses, options, credential) - .getDB(dbName); - return new ArrayList<>(db.getCollectionNames()); + MongoDatabase db = MongoCnxnManager.getClient(addresses, options, + credential).getDatabase(dbName); + List<String> collectionNames = new ArrayList<>(); + db.listCollectionNames().into(collectionNames); + return collectionNames; } catch (MongoException me) { logger.warn("Failure while getting collection names from '{}'. {}", dbName, me.getMessage());
