DRILL-3177: Part 2, various Mongo plugin enhancements - Fix issues with initial Mongo 3 update patch - Update mongo connection management so we don't generate infinite MongoClients. - Move from static class to management inside Storage Plugin. - Increase limit on number of connections. - move mongo cursor initialization to batch reading, so a query won't block indefinitely on this code - Update mongo driver version to 3.0.2 - Update host access to use all hosts rather than just master - Remove references to no longer used UnknownHostException
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/453b363a Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/453b363a Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/453b363a Branch: refs/heads/master Commit: 453b363a7e3c97a2d57984c5e9553413c2d05c6a Parents: 5bb75b2 Author: Jacques Nadeau <[email protected]> Authored: Thu Jun 25 19:56:24 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Jun 25 19:58:47 2015 -0700 ---------------------------------------------------------------------- contrib/storage-mongo/pom.xml | 14 +- .../exec/store/mongo/MongoCnxnManager.java | 80 ------ .../drill/exec/store/mongo/MongoGroupScan.java | 256 +++++++++---------- .../exec/store/mongo/MongoRecordReader.java | 68 +++-- .../exec/store/mongo/MongoScanBatchCreator.java | 7 +- .../exec/store/mongo/MongoStoragePlugin.java | 70 ++++- .../store/mongo/schema/MongoSchemaFactory.java | 32 +-- 7 files changed, 235 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/pom.xml b/contrib/storage-mongo/pom.xml index ce4ad7e..2d7ad2d 100644 --- a/contrib/storage-mongo/pom.xml +++ b/contrib/storage-mongo/pom.xml @@ -34,13 +34,13 @@ <artifactId>drill-java-exec</artifactId> <version>${project.version}</version> </dependency> - - <dependency> - <groupId>org.mongodb</groupId> - <artifactId>mongo-java-driver</artifactId> - <version>3.0.1</version> - <scope>compile</scope> - </dependency> + + + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>3.0.2</version> + </dependency> <!-- Test dependencie --> <dependency> http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java deleted file mode 100644 index 35cc265..0000000 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoCnxnManager.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.mongo; - -import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoCredential; -import com.mongodb.ServerAddress; - -public class MongoCnxnManager { - - private static final Logger logger = LoggerFactory - .getLogger(MongoCnxnManager.class); - private static Cache<MongoCnxnKey, MongoClient> addressClientMap; - - static { - addressClientMap = CacheBuilder.newBuilder().maximumSize(5) - .expireAfterAccess(10, TimeUnit.MINUTES) - .removalListener(new AddressCloser()).build(); - } - - private static class AddressCloser implements - RemovalListener<MongoCnxnKey, MongoClient> { - @Override - public synchronized void onRemoval( - RemovalNotification<MongoCnxnKey, MongoClient> removal) { - removal.getValue().close(); - logger.debug("Closed connection to {}.", removal.getKey().toString()); - } - } - - public synchronized static MongoClient getClient( - List<ServerAddress> addresses, MongoClientOptions clientOptions, - MongoCredential credential) throws UnknownHostException { - // Take the first replica from the replicated servers - ServerAddress serverAddress = addresses.get(0); - String userName = credential == null ? null : credential.getUserName(); - MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName); - MongoClient client = addressClientMap.getIfPresent(key); - if (client == null) { - if (credential != null) { - List<MongoCredential> credentialList = Arrays.asList(credential); - client = new MongoClient(addresses, credentialList, clientOptions); - } else { - client = new MongoClient(addresses, clientOptions); - } - addressClientMap.put(key, client); - logger.debug("Created connection to {}.", key.toString()); - logger.debug("Number of open connections {}.", addressClientMap.size()); - } - return client; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java index b7885d3..6bf4d92 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoGroupScan.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.mongo; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -65,10 +64,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.client.FindIterable; @@ -173,157 +169,155 @@ public class MongoGroupScan extends AbstractGroupScan implements @SuppressWarnings({ "rawtypes" }) private void init() throws IOException { - try { - List<String> h = storagePluginConfig.getHosts(); - List<ServerAddress> addresses = Lists.newArrayList(); - for (String host : h) { - addresses.add(new ServerAddress(host)); - } - MongoClient client = MongoCnxnManager.getClient(addresses, - storagePluginConfig.getMongoOptions(), - storagePluginConfig.getMongoCrendials()); - chunksMapping = Maps.newHashMap(); - chunksInverseMapping = Maps.newLinkedHashMap(); - if (isShardedCluster(client)) { - MongoDatabase db = client.getDatabase(CONFIG); - MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS); - Document filter = new Document(); - filter - .put( - NS, - this.scanSpec.getDbName() + "." - + this.scanSpec.getCollectionName()); - - Document projection = new Document(); - projection.put(SHARD, select); - projection.put(MIN, select); - projection.put(MAX, select); - - FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection); - MongoCursor<Document> iterator = chunkCursor.iterator(); - - MongoCollection<Document> shardsCollection = db.getCollection(SHARDS); - - projection = new Document(); - projection.put(HOST, select); - - while (iterator.hasNext()) { - Document chunkObj = iterator.next(); - String shardName = (String) chunkObj.get(SHARD); - String chunkId = (String) chunkObj.get(ID); - filter = new Document(ID, shardName); - FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection); - MongoCursor<Document> hostIterator = hostCursor.iterator(); - while (hostIterator.hasNext()) { - Document hostObj = hostIterator.next(); - String hostEntry = (String) hostObj.get(HOST); - String[] tagAndHost = StringUtils.split(hostEntry, '/'); - String[] hosts = tagAndHost.length > 1 ? StringUtils.split( - tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ','); - List<String> chunkHosts = Arrays.asList(hosts); - //to get the address list from one of the shard nodes, need to get port. - MongoClient shardClient = new MongoClient(hosts[0]); - Set<ServerAddress> addressList = getPreferredHosts(shardClient, chunkHosts); - if (addressList == null) { - addressList = Sets.newHashSet(); - for (String host : chunkHosts) { - addressList.add(new ServerAddress(host)); - } - } - chunksMapping.put(chunkId, addressList); - ServerAddress address = addressList.iterator().next(); - List<ChunkInfo> chunkList = chunksInverseMapping.get(address - .getHost()); - if (chunkList == null) { - chunkList = Lists.newArrayList(); - chunksInverseMapping.put(address.getHost(), chunkList); - } - List<String> chunkHostsList = new ArrayList<String>(); - for(ServerAddress serverAddr : addressList){ - chunkHostsList.add(serverAddr.toString()); + + List<String> h = storagePluginConfig.getHosts(); + List<ServerAddress> addresses = Lists.newArrayList(); + for (String host : h) { + addresses.add(new ServerAddress(host)); + } + MongoClient client = storagePlugin.getClient(); + chunksMapping = Maps.newHashMap(); + chunksInverseMapping = Maps.newLinkedHashMap(); + if (isShardedCluster(client)) { + MongoDatabase db = client.getDatabase(CONFIG); + MongoCollection<Document> chunksCollection = db.getCollection(CHUNKS); + Document filter = new Document(); + filter + .put( + NS, + this.scanSpec.getDbName() + "." + + this.scanSpec.getCollectionName()); + + Document projection = new Document(); + projection.put(SHARD, select); + projection.put(MIN, select); + projection.put(MAX, select); + + FindIterable<Document> chunkCursor = chunksCollection.find(filter).projection(projection); + MongoCursor<Document> iterator = chunkCursor.iterator(); + + MongoCollection<Document> shardsCollection = db.getCollection(SHARDS); + + projection = new Document(); + projection.put(HOST, select); + + while (iterator.hasNext()) { + Document chunkObj = iterator.next(); + String shardName = (String) chunkObj.get(SHARD); + String chunkId = (String) chunkObj.get(ID); + filter = new Document(ID, shardName); + FindIterable<Document> hostCursor = shardsCollection.find(filter).projection(projection); + MongoCursor<Document> hostIterator = hostCursor.iterator(); + while (hostIterator.hasNext()) { + Document hostObj = hostIterator.next(); + String hostEntry = (String) hostObj.get(HOST); + String[] tagAndHost = StringUtils.split(hostEntry, '/'); + String[] hosts = tagAndHost.length > 1 ? StringUtils.split( + tagAndHost[1], ',') : StringUtils.split(tagAndHost[0], ','); + List<String> chunkHosts = Arrays.asList(hosts); + Set<ServerAddress> addressList = getPreferredHosts(storagePlugin.getClient(addresses), chunkHosts); + if (addressList == null) { + addressList = Sets.newHashSet(); + for (String host : chunkHosts) { + addressList.add(new ServerAddress(host)); } - ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId); - DBObject minObj = (BasicDBObject) chunkObj.get(MIN); - - Map<String, Object> minFilters = Maps.newHashMap(); - Map minMap = minObj.toMap(); - Set keySet = minMap.keySet(); - for (Object keyObj : keySet) { - Object object = minMap.get(keyObj); - if (!(object instanceof MinKey)) { - minFilters.put(keyObj.toString(), object); - } + } + chunksMapping.put(chunkId, addressList); + ServerAddress address = addressList.iterator().next(); + List<ChunkInfo> chunkList = chunksInverseMapping.get(address + .getHost()); + if (chunkList == null) { + chunkList = Lists.newArrayList(); + chunksInverseMapping.put(address.getHost(), chunkList); + } + List<String> chunkHostsList = new ArrayList<String>(); + for (ServerAddress serverAddr : addressList) { + chunkHostsList.add(serverAddr.toString()); + } + ChunkInfo chunkInfo = new ChunkInfo(chunkHostsList, chunkId); + Document minMap = (Document) chunkObj.get(MIN); + + Map<String, Object> minFilters = Maps.newHashMap(); + Set keySet = minMap.keySet(); + for (Object keyObj : keySet) { + Object object = minMap.get(keyObj); + if (!(object instanceof MinKey)) { + minFilters.put(keyObj.toString(), object); } - chunkInfo.setMinFilters(minFilters); - - DBObject maxObj = (BasicDBObject) chunkObj.get(MAX); - Map<String, Object> maxFilters = Maps.newHashMap(); - Map maxMap = maxObj.toMap(); - keySet = maxMap.keySet(); - for (Object keyObj : keySet) { - Object object = maxMap.get(keyObj); - if (!(object instanceof MaxKey)) { - maxFilters.put(keyObj.toString(), object); - } + } + chunkInfo.setMinFilters(minFilters); + + Map<String, Object> maxFilters = Maps.newHashMap(); + Map maxMap = (Document) chunkObj.get(MAX); + keySet = maxMap.keySet(); + for (Object keyObj : keySet) { + Object object = maxMap.get(keyObj); + if (!(object instanceof MaxKey)) { + maxFilters.put(keyObj.toString(), object); } - - chunkInfo.setMaxFilters(maxFilters); - chunkList.add(chunkInfo); } + + chunkInfo.setMaxFilters(maxFilters); + chunkList.add(chunkInfo); } - } else { - String chunkName = scanSpec.getDbName() + "." - + scanSpec.getCollectionName(); - List<String> hosts = storagePluginConfig.getHosts(); - Set<ServerAddress> addressList = getPreferredHosts(client, hosts); - if (addressList == null) { - addressList = Sets.newHashSet(); - for (String host : hosts) { - addressList.add(new ServerAddress(host)); - } + } + } else { + String chunkName = scanSpec.getDbName() + "." + + scanSpec.getCollectionName(); + List<String> hosts = storagePluginConfig.getHosts(); + Set<ServerAddress> addressList = getPreferredHosts(client, hosts); + if (addressList == null) { + addressList = Sets.newHashSet(); + for (String host : hosts) { + addressList.add(new ServerAddress(host)); } - chunksMapping.put(chunkName, addressList); - - String host = hosts.get(0); - ServerAddress address = new ServerAddress(host); - ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName); - chunkInfo.setMinFilters(Collections.<String, Object> emptyMap()); - chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap()); - List<ChunkInfo> chunksList = Lists.newArrayList(); - chunksList.add(chunkInfo); - chunksInverseMapping.put(address.getHost(), chunksList); } - } catch (UnknownHostException e) { - throw new DrillRuntimeException(e.getMessage(), e); + chunksMapping.put(chunkName, addressList); + + String host = hosts.get(0); + ServerAddress address = new ServerAddress(host); + ChunkInfo chunkInfo = new ChunkInfo(hosts, chunkName); + chunkInfo.setMinFilters(Collections.<String, Object> emptyMap()); + chunkInfo.setMaxFilters(Collections.<String, Object> emptyMap()); + List<ChunkInfo> chunksList = Lists.newArrayList(); + chunksList.add(chunkInfo); + chunksInverseMapping.put(address.getHost(), chunksList); } } @SuppressWarnings("unchecked") - private Set<ServerAddress> getPreferredHosts(MongoClient client, - List<String> hosts) throws UnknownHostException { + private Set<ServerAddress> getPreferredHosts(MongoClient client, List<String> hosts) { Set<ServerAddress> addressList = Sets.newHashSet(); MongoDatabase db = client.getDatabase(scanSpec.getDbName()); ReadPreference readPreference = client.getReadPreference(); Document command = db.runCommand(new Document("isMaster", 1)); + final String primaryHost = command.getString("primary"); + final List<String> hostsList = (List<String>) command.get("hosts"); + switch (readPreference.getName().toUpperCase()) { case "PRIMARY": case "PRIMARYPREFERRED": - String primaryHost = command.getString("primary"); + if (primaryHost == null) { + return null; + } addressList.add(new ServerAddress(primaryHost)); return addressList; case "SECONDARY": case "SECONDARYPREFERRED": - primaryHost = command.getString("primary"); - List<String> hostsList = (List<String>) command.get("hosts"); + if (primaryHost == null || hostsList == null) { + return null; + } hostsList.remove(primaryHost); for (String host : hostsList) { addressList.add(new ServerAddress(host)); } return addressList; case "NEAREST": - hostsList = (List<String>) command.get("hosts"); + if (hostsList == null) { + return null; + } for (String host : hostsList) { addressList.add(new ServerAddress(host)); } @@ -468,16 +462,8 @@ public class MongoGroupScan extends AbstractGroupScan implements @Override public ScanStats getScanStats() { - MongoClientURI clientURI = new MongoClientURI( - this.storagePluginConfig.getConnection()); - try { - List<String> hosts = clientURI.getHosts(); - List<ServerAddress> addresses = Lists.newArrayList(); - for (String host : hosts) { - addresses.add(new ServerAddress(host)); - } - MongoClient client = MongoCnxnManager.getClient(addresses, - clientURI.getOptions(), clientURI.getCredentials()); + try{ + MongoClient client = storagePlugin.getClient(); MongoDatabase db = client.getDatabase(scanSpec.getDbName()); MongoCollection<Document> collection = db.getCollection(scanSpec .getCollectionName()); http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java index 40fc810..0ac519f 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.mongo; import java.io.IOException; -import java.net.UnknownHostException; import java.util.Collection; import java.util.List; import java.util.Map; @@ -47,8 +46,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -64,27 +61,29 @@ public class MongoRecordReader extends AbstractRecordReader { private VectorContainerWriter writer; private BasicDBObject filters; - private BasicDBObject fields; + private final BasicDBObject fields; - private MongoClientOptions clientOptions; - private MongoCredential credential; - private FragmentContext fragmentContext; + private final FragmentContext fragmentContext; private OperatorContext operatorContext; - private Boolean enableAllTextMode; - private Boolean readNumbersAsDouble; + private final MongoStoragePlugin plugin; - public MongoRecordReader(MongoSubScan.MongoSubScanSpec subScanSpec, - List<SchemaPath> projectedColumns, FragmentContext context, - MongoClientOptions clientOptions, MongoCredential credential) { - this.clientOptions = clientOptions; - this.credential = credential; - this.fields = new BasicDBObject(); + private final boolean enableAllTextMode; + private final boolean readNumbersAsDouble; + + public MongoRecordReader( + MongoSubScan.MongoSubScanSpec subScanSpec, + List<SchemaPath> projectedColumns, + FragmentContext context, + MongoStoragePlugin plugin) { + + fields = new BasicDBObject(); // exclude _id field, if not mentioned by user. - this.fields.put(DrillMongoConstants.ID, Integer.valueOf(0)); + fields.put(DrillMongoConstants.ID, Integer.valueOf(0)); setColumns(projectedColumns); - this.fragmentContext = context; - this.filters = new BasicDBObject(); + fragmentContext = context; + this.plugin = plugin; + filters = new BasicDBObject(); Map<String, List<BasicDBObject>> mergedFilters = MongoUtils.mergeFilters( subScanSpec.getMinFilters(), subScanSpec.getMaxFilters()); buildFilters(subScanSpec.getFilter(), mergedFilters); @@ -94,8 +93,7 @@ public class MongoRecordReader extends AbstractRecordReader { } @Override - protected Collection<SchemaPath> transformColumns( - Collection<SchemaPath> projectedColumns) { + protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) { Set<SchemaPath> transformed = Sets.newLinkedHashSet(); if (!isStarQuery()) { for (SchemaPath column : projectedColumns ) { @@ -132,19 +130,14 @@ public class MongoRecordReader extends AbstractRecordReader { } private void init(MongoSubScan.MongoSubScanSpec subScanSpec) { - try { - List<String> hosts = subScanSpec.getHosts(); - List<ServerAddress> addresses = Lists.newArrayList(); - for (String host : hosts) { - addresses.add(new ServerAddress(host)); - } - MongoClient client = MongoCnxnManager.getClient(addresses, clientOptions, - credential); - MongoDatabase db = client.getDatabase(subScanSpec.getDbName()); - collection = db.getCollection(subScanSpec.getCollectionName()); - } catch (UnknownHostException e) { - throw new DrillRuntimeException(e.getMessage(), e); + List<String> hosts = subScanSpec.getHosts(); + List<ServerAddress> addresses = Lists.newArrayList(); + for (String host : hosts) { + addresses.add(new ServerAddress(host)); } + MongoClient client = plugin.getClient(addresses); + MongoDatabase db = client.getDatabase(subScanSpec.getDbName()); + collection = db.getCollection(subScanSpec.getCollectionName()); } @Override @@ -152,13 +145,18 @@ public class MongoRecordReader extends AbstractRecordReader { this.operatorContext = context; this.writer = new VectorContainerWriter(output); this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false, readNumbersAsDouble); - logger.info("Filters Applied : " + filters); - logger.info("Fields Selected :" + fields); - cursor = collection.find(filters).projection(fields).iterator(); + } @Override public int next() { + if(cursor == null){ + logger.info("Filters Applied : " + filters); + logger.info("Fields Selected :" + fields); + cursor = collection.find(filters).projection(fields).batchSize(100).iterator(); + } + + writer.allocate(); writer.reset(); http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java index 3a8a496..49b1750 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java @@ -51,12 +51,7 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> { if ((columns = subScan.getColumns()) == null) { columns = GroupScan.ALL_COLUMNS; } - MongoClientOptions clientOptions = subScan.getMongoPluginConfig() - .getMongoOptions(); - MongoCredential mongoCrendials = subScan.getMongoPluginConfig() - .getMongoCrendials(); - readers.add(new MongoRecordReader(scanSpec, columns, context, - clientOptions, mongoCrendials)); + readers.add(new MongoRecordReader(scanSpec, columns, context, subScan.getMongoStoragePlugin())); } catch (Exception e) { logger.error("MongoRecordReader creation failed for subScan: " + subScan + "."); http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java index 38bc91d..093df57 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java @@ -18,10 +18,13 @@ package org.apache.drill.exec.store.mongo; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.calcite.schema.SchemaPlus; - import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.physical.base.AbstractGroupScan; @@ -35,21 +38,36 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; public class MongoStoragePlugin extends AbstractStoragePlugin { static final Logger logger = LoggerFactory .getLogger(MongoStoragePlugin.class); - private DrillbitContext context; - private MongoStoragePluginConfig mongoConfig; - private MongoSchemaFactory schemaFactory; + private final DrillbitContext context; + private final MongoStoragePluginConfig mongoConfig; + private final MongoSchemaFactory schemaFactory; + private final Cache<MongoCnxnKey, MongoClient> addressClientMap; + private final MongoClientURI clientURI; public MongoStoragePlugin(MongoStoragePluginConfig mongoConfig, DrillbitContext context, String name) throws IOException, ExecutionSetupException { this.context = context; this.mongoConfig = mongoConfig; + this.clientURI = new MongoClientURI(this.mongoConfig.getConnection()); + this.addressClientMap = CacheBuilder.newBuilder() + .expireAfterAccess(24, TimeUnit.HOURS) + .removalListener(new AddressCloser()).build(); this.schemaFactory = new MongoSchemaFactory(this, name); } @@ -82,4 +100,48 @@ public class MongoStoragePlugin extends AbstractStoragePlugin { return ImmutableSet.of(MongoPushDownFilterForScan.INSTANCE); } + + private class AddressCloser implements + RemovalListener<MongoCnxnKey, MongoClient> { + @Override + public synchronized void onRemoval( + RemovalNotification<MongoCnxnKey, MongoClient> removal) { + removal.getValue().close(); + logger.debug("Closed connection to {}.", removal.getKey().toString()); + } + } + + public MongoClient getClient(String host) { + return getClient(Collections.singletonList(new ServerAddress(host))); + } + + public MongoClient getClient() { + List<String> hosts = clientURI.getHosts(); + List<ServerAddress> addresses = Lists.newArrayList(); + for (String host : hosts) { + addresses.add(new ServerAddress(host)); + } + return getClient(addresses); + } + + public synchronized MongoClient getClient(List<ServerAddress> addresses) { + // Take the first replica from the replicated servers + final ServerAddress serverAddress = addresses.get(0); + final MongoCredential credential = clientURI.getCredentials(); + String userName = credential == null ? null : credential.getUserName(); + MongoCnxnKey key = new MongoCnxnKey(serverAddress, userName); + MongoClient client = addressClientMap.getIfPresent(key); + if (client == null) { + if (credential != null) { + List<MongoCredential> credentialList = Arrays.asList(credential); + client = new MongoClient(addresses, credentialList, clientURI.getOptions()); + } else { + client = new MongoClient(addresses, clientURI.getOptions()); + } + addressClientMap.put(key, client); + logger.debug("Created connection to {}.", key.toString()); + logger.debug("Number of open connections {}.", addressClientMap.size()); + } + return client; + } } http://git-wip-us.apache.org/repos/asf/drill/blob/453b363a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java index d453fb9..9d08abd 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.mongo.schema; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -35,7 +34,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.SchemaFactory; -import org.apache.drill.exec.store.mongo.MongoCnxnManager; import org.apache.drill.exec.store.mongo.MongoScanSpec; import org.apache.drill.exec.store.mongo.MongoStoragePlugin; import org.apache.drill.exec.store.mongo.MongoStoragePluginConfig; @@ -46,13 +44,10 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoCredential; +import com.mongodb.MongoClient; import com.mongodb.MongoException; -import com.mongodb.ServerAddress; import com.mongodb.client.MongoDatabase; public class MongoSchemaFactory implements SchemaFactory { @@ -66,23 +61,12 @@ public class MongoSchemaFactory implements SchemaFactory { private LoadingCache<String, List<String>> tableNameLoader; private final String schemaName; private final MongoStoragePlugin plugin; + private final MongoClient client; - private final List<ServerAddress> addresses; - private final MongoClientOptions options; - private final MongoCredential credential; - - public MongoSchemaFactory(MongoStoragePlugin schema, String schemaName) - throws ExecutionSetupException, UnknownHostException { - this.plugin = schema; + public MongoSchemaFactory(MongoStoragePlugin plugin, String schemaName) throws ExecutionSetupException { + this.plugin = plugin; this.schemaName = schemaName; - - List<String> hosts = plugin.getConfig().getHosts(); - addresses = Lists.newArrayList(); - for (String host : hosts) { - addresses.add(new ServerAddress(host)); - } - options = plugin.getConfig().getMongoOptions(); - credential = plugin.getConfig().getMongoCrendials(); + this.client = plugin.getClient(); databases = CacheBuilder // .newBuilder() // @@ -104,8 +88,7 @@ public class MongoSchemaFactory implements SchemaFactory { } try { List<String> dbNames = new ArrayList<>(); - MongoCnxnManager.getClient(addresses, options, credential) - .listDatabaseNames().into(dbNames); + client.listDatabaseNames().into(dbNames); return dbNames; } catch (MongoException me) { logger.warn("Failure while loading databases in Mongo. {}", @@ -123,8 +106,7 @@ public class MongoSchemaFactory implements SchemaFactory { @Override public List<String> load(String dbName) throws Exception { try { - MongoDatabase db = MongoCnxnManager.getClient(addresses, options, - credential).getDatabase(dbName); + MongoDatabase db = client.getDatabase(dbName); List<String> collectionNames = new ArrayList<>(); db.listCollectionNames().into(collectionNames); return collectionNames;
