Repository: gora Updated Branches: refs/heads/master b0c9e89f8 -> a3f44256d
GORA-427 Configure MongoDB ReadPreference and WriteConcern (drazzib). Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/a3f44256 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/a3f44256 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/a3f44256 Branch: refs/heads/master Commit: a3f44256d6648962b8bb50e717e79698180a32c2 Parents: b0c9e89 Author: Damien Raude-Morvan <[email protected]> Authored: Mon Aug 3 18:58:16 2015 +0200 Committer: Damien Raude-Morvan <[email protected]> Committed: Mon Aug 3 19:00:04 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/gora/mongodb/store/MongoStore.java | 121 ++++----------- .../mongodb/store/MongoStoreParameters.java | 155 +++++++++++++++++++ gora-mongodb/src/test/conf/gora.properties | 2 + .../gora/mongodb/GoraMongodbTestDriver.java | 3 +- 5 files changed, 191 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/a3f44256/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba06b7b..a149f82 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,8 @@ Apache Gora 0.6.1 Release - 02/03/2015 (dd/mm/yyyy) Release Report - http://s.apache.org/l69 +* GORA-427 Configure MongoDB ReadPreference and WriteConcern (drazzib) + * GORA-426 MongoDB cursor timeout on long running parse job (Alexander Yastrebov via drazzib) * GORA-424 Cache cursor size to improve performance (Alexander Yastrebov via drazzib) http://git-wip-us.apache.org/repos/asf/gora/blob/a3f44256/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java ---------------------------------------------------------------------- diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java index e1abe0f..6794e70 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java @@ -72,44 +72,12 @@ public class MongoStore<K, T extends PersistentBase> extends public static final Logger LOG = LoggerFactory.getLogger(MongoStore.class); - // Configuration properties - - /** - * Property indicating if the hadoop configuration has priority or not - */ - public static final String PROP_OVERRIDING = "gora.mongodb.override_hadoop_configuration"; - - /** - * Property pointing to the file for the mapping - */ - public static final String PROP_MAPPING_FILE = "gora.mongodb.mapping.file"; - - /** - * Property pointing to the host where the server is running - */ - public static final String PROP_MONGO_SERVERS = "gora.mongodb.servers"; - - /** - * Property pointing to the username to connect to the server - */ - public static final String PROP_MONGO_LOGIN = "gora.mongodb.login"; - - /** - * Property pointing to the secret to connect to the server - */ - public static final String PROP_MONGO_SECRET = "gora.mongodb.secret"; - /** * Default value for mapping file */ public static final String DEFAULT_MAPPING_FILE = "/gora-mongodb-mapping.xml"; /** - * Property to select the database - */ - public static String PROP_MONGO_DB = "gora.mongodb.db"; - - /** * MongoDB client */ private static ConcurrentHashMap<String, MongoClient> mapsOfClients = new ConcurrentHashMap<String, MongoClient>(); @@ -138,24 +106,7 @@ public class MongoStore<K, T extends PersistentBase> extends final Class<T> pPersistentClass, final Properties properties) { try { LOG.debug("Initializing MongoDB store"); - - // Prepare the configuration - String vPropMappingFile = properties.getProperty(PROP_MAPPING_FILE, - DEFAULT_MAPPING_FILE); - String vPropMongoServers = properties.getProperty(PROP_MONGO_SERVERS); - String vPropMongoLogin = properties.getProperty(PROP_MONGO_LOGIN); - String vPropMongoSecret = properties.getProperty(PROP_MONGO_SECRET); - String vPropMongoDb = properties.getProperty(PROP_MONGO_DB); - String overrideHadoop = properties.getProperty(PROP_OVERRIDING); - if (!Boolean.parseBoolean(overrideHadoop)) { - LOG.debug("Hadoop configuration has priority."); - vPropMappingFile = getConf().get(PROP_MAPPING_FILE, vPropMappingFile); - vPropMongoServers = getConf() - .get(PROP_MONGO_SERVERS, vPropMongoServers); - vPropMongoLogin = getConf().get(PROP_MONGO_LOGIN, vPropMongoLogin); - vPropMongoSecret = getConf().get(PROP_MONGO_SECRET, vPropMongoSecret); - vPropMongoDb = getConf().get(PROP_MONGO_DB, vPropMongoDb); - } + MongoStoreParameters parameters = MongoStoreParameters.load(properties, getConf()); super.initialize(keyClass, pPersistentClass, properties); filterUtil = new MongoFilterUtil<K, T>(getConf()); @@ -163,18 +114,17 @@ public class MongoStore<K, T extends PersistentBase> extends // Load the mapping MongoMappingBuilder<K, T> builder = new MongoMappingBuilder<K, T>(this); LOG.debug("Initializing Mongo store with mapping {}.", - new Object[] { vPropMappingFile }); - builder.fromFile(vPropMappingFile); + new Object[] { parameters.getMappingFile() }); + builder.fromFile(parameters.getMappingFile()); mapping = builder.build(); // Prepare MongoDB connection - mongoClientDB = getDB(vPropMongoServers, vPropMongoDb, vPropMongoLogin, - vPropMongoSecret); + mongoClientDB = getDB(parameters); mongoClientColl = mongoClientDB .getCollection(mapping.getCollectionName()); LOG.info("Initialized Mongo store for database {} of {}.", new Object[] { - vPropMongoDb, vPropMongoServers }); + parameters.getDbname(), parameters.getServers() }); } catch (IOException e) { LOG.error("Error while initializing MongoDB store: {}", new Object[] { e.getMessage() }); @@ -184,7 +134,7 @@ public class MongoStore<K, T extends PersistentBase> extends /** * Retrieve a client connected to the MongoDB server to be used. - * + * * @param servers * This value should specify the host:port (at least one) for * connecting to remote MongoDB. Multiple values must be separated by @@ -192,19 +142,29 @@ public class MongoStore<K, T extends PersistentBase> extends * @return a {@link Mongo} instance connected to the server * @throws UnknownHostException */ - private MongoClient getClient(final String servers) + private MongoClient getClient(MongoStoreParameters params) throws UnknownHostException { // Configure options - MongoClientOptions opts = new MongoClientOptions.Builder() - .dbEncoderFactory(GoraDBEncoder.FACTORY) // Utf8 serialization! - .build(); + MongoClientOptions.Builder optBuilder = new MongoClientOptions.Builder() + .dbEncoderFactory(GoraDBEncoder.FACTORY); // Utf8 serialization! + if (params.getReadPreference() != null) { + optBuilder.readPreference(ReadPreference.valueOf(params.getReadPreference())); + } + if (params.getWriteConcern() != null) { + optBuilder.writeConcern(WriteConcern.valueOf(params.getWriteConcern())); + } + // If configuration contains a login + secret, try to authenticated with DB + List<MongoCredential> credentials = null; + if (params.getLogin() != null && params.getSecret() != null) { + credentials = new ArrayList<MongoCredential>(); + credentials.add(MongoCredential.createCredential(params.getLogin(), params.getDbname(), params.getSecret().toCharArray())); + } // Build server address List<ServerAddress> addrs = new ArrayList<ServerAddress>(); - Iterable<String> serversArray = Splitter.on(",").split(servers); + Iterable<String> serversArray = Splitter.on(",").split(params.getServers()); if (serversArray != null) { for (String server : serversArray) { - Iterable<String> params = Splitter.on(":").trimResults().split(server); - Iterator<String> paramsIterator = params.iterator(); + Iterator<String> paramsIterator = Splitter.on(":").trimResults().split(server).iterator(); if (!paramsIterator.hasNext()) { // No server, use default addrs.add(new ServerAddress()); @@ -220,41 +180,19 @@ public class MongoStore<K, T extends PersistentBase> extends } } // Connect to the Mongo server - return new MongoClient(addrs, opts); + return new MongoClient(addrs, credentials, optBuilder.build()); } /** * Get reference to Mongo DB, using credentials if not null. - * - * @param servers - * @param dbname - * Name of database to connect to. - * @param login - * Optionnal login for remote database. - * @param secret - * Optional secret for remote database. - * @return a {@link DB} instance from <tt>mongoClient</tt> or null if - * authentication request failed. */ - private DB getDB(final String servers, final String dbname, - final String login, final String secret) throws UnknownHostException { + private DB getDB(MongoStoreParameters parameters) throws UnknownHostException { // Get reference to Mongo DB - if (!mapsOfClients.containsKey(servers)) - mapsOfClients.put(servers, getClient(servers)); - DB db = mapsOfClients.get(servers).getDB(dbname); - // By default, we are authenticated - boolean auth = true; - // If configuration contains a login + secret, try to authenticated with DB - if (login != null && secret != null) { - auth = db.authenticate(login, secret.toCharArray()); - } - - if (auth) { - return db; - } else { - return null; - } + if (!mapsOfClients.containsKey(parameters.getServers())) + mapsOfClients.put(parameters.getServers(), getClient(parameters)); + DB db = mapsOfClients.get(parameters.getServers()).getDB(parameters.getDbname()); + return db; } public MongoMapping getMapping() { @@ -1033,4 +971,5 @@ public class MongoStore<K, T extends PersistentBase> extends } return key.replace("\u00B7", "."); } + } http://git-wip-us.apache.org/repos/asf/gora/blob/a3f44256/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java ---------------------------------------------------------------------- diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java new file mode 100644 index 0000000..2288156 --- /dev/null +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.mongodb.store; + +import com.mongodb.DB; +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +/** + * @author Damien Raude-Morvan + */ +public class MongoStoreParameters { + + // Configuration properties + + /** + * Property indicating if the hadoop configuration has priority or not + */ + public static final String PROP_OVERRIDING = "gora.mongodb.override_hadoop_configuration"; + + /** + * Property pointing to the file for the mapping + */ + public static final String PROP_MAPPING_FILE = "gora.mongodb.mapping.file"; + + /** + * Property pointing to the host where the server is running + */ + public static final String PROP_MONGO_SERVERS = "gora.mongodb.servers"; + + /** + * Property pointing to the username to connect to the server + */ + public static final String PROP_MONGO_LOGIN = "gora.mongodb.login"; + + /** + * Property pointing to the secret to connect to the server + */ + public static final String PROP_MONGO_SECRET = "gora.mongodb.secret"; + + /** + * Property pointing to MongoDB Read Preference value. + * + * @see <a href="http://docs.mongodb.org/manual/core/read-preference/">Read Preference in MongoDB Documentation</a> + * @see <a href="http://api.mongodb.org/java/current/com/mongodb/ReadPreference.html">ReadPreference in MongoDB Java Driver</a> + */ + public static final String PROP_MONGO_READPREFERENCE = "gora.mongodb.readpreference"; + + /** + * Property pointing to MongoDB Write Concern value. + * + * @see <a href="http://docs.mongodb.org/manual/core/write-concern/">Write Concern in MongoDB Documentation</a> + * @see <a href="http://api.mongodb.org/java/current/com/mongodb/WriteConcern.html">WriteConcern in MongoDB Java Driver</a> + */ + public static final String PROP_MONGO_WRITECONCERN = "gora.mongodb.writeconcern"; + + /** + * Property to select the database + */ + public static String PROP_MONGO_DB = "gora.mongodb.db"; + + private final String mappingFile; + private final String servers; + private final String dbname; + private final String login; + private final String secret; + private final String readPreference; + private final String writeConcern; + + /** + * @param mappingFile + * @param servers + * @param dbname Name of database to connect to. + * @param login Optionnal login for remote database. + * @param secret Optional secret for remote database. + * @param readPreference + * @param writeConcern @return a {@link DB} instance from <tt>mongoClient</tt> or null if + */ + private MongoStoreParameters(String mappingFile, String servers, String dbname, String login, String secret, String readPreference, String writeConcern) { + this.mappingFile = mappingFile; + this.servers = servers; + this.dbname = dbname; + this.login = login; + this.secret = secret; + this.readPreference = readPreference; + this.writeConcern = writeConcern; + } + + public String getMappingFile() { + return mappingFile; + } + + public String getServers() { + return servers; + } + + public String getDbname() { + return dbname; + } + + public String getLogin() { + return login; + } + + public String getSecret() { + return secret; + } + + public String getReadPreference() { + return readPreference; + } + + public String getWriteConcern() { + return writeConcern; + } + + public static MongoStoreParameters load(Properties properties, Configuration conf) { + // Prepare the configuration + String vPropMappingFile = properties.getProperty(PROP_MAPPING_FILE, MongoStore.DEFAULT_MAPPING_FILE); + String vPropMongoServers = properties.getProperty(PROP_MONGO_SERVERS); + String vPropMongoLogin = properties.getProperty(PROP_MONGO_LOGIN); + String vPropMongoSecret = properties.getProperty(PROP_MONGO_SECRET); + String vPropMongoDb = properties.getProperty(PROP_MONGO_DB); + String vPropMongoRead = properties.getProperty(PROP_MONGO_READPREFERENCE); + String vPropMongoWrite = properties.getProperty(PROP_MONGO_WRITECONCERN); + String overrideHadoop = properties.getProperty(PROP_OVERRIDING); + if (!Boolean.parseBoolean(overrideHadoop)) { + MongoStore.LOG.debug("Hadoop configuration has priority."); + vPropMappingFile = conf.get(PROP_MAPPING_FILE, vPropMappingFile); + vPropMongoServers = conf.get(PROP_MONGO_SERVERS, vPropMongoServers); + vPropMongoLogin = conf.get(PROP_MONGO_LOGIN, vPropMongoLogin); + vPropMongoSecret = conf.get(PROP_MONGO_SECRET, vPropMongoSecret); + vPropMongoDb = conf.get(PROP_MONGO_DB, vPropMongoDb); + vPropMongoRead = conf.get(PROP_MONGO_READPREFERENCE, vPropMongoRead); + vPropMongoWrite = conf.get(PROP_MONGO_WRITECONCERN, vPropMongoWrite); + } + return new MongoStoreParameters(vPropMappingFile, vPropMongoServers, vPropMongoDb, vPropMongoLogin, vPropMongoSecret, vPropMongoRead, vPropMongoWrite); + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/a3f44256/gora-mongodb/src/test/conf/gora.properties ---------------------------------------------------------------------- diff --git a/gora-mongodb/src/test/conf/gora.properties b/gora-mongodb/src/test/conf/gora.properties index 40be223..39960df 100644 --- a/gora-mongodb/src/test/conf/gora.properties +++ b/gora-mongodb/src/test/conf/gora.properties @@ -25,4 +25,6 @@ gora.mongodb.override_hadoop_configuration=false gora.mongodb.mapping.file=/gora-mongodb-mapping.xml gora.mongodb.servers=localhost gora.mongodb.db=nutchtest +gora.mongodb.readpreference=primary +gora.mongodb.writeconcern=acknowledged http://git-wip-us.apache.org/repos/asf/gora/blob/a3f44256/gora-mongodb/src/test/java/org/apache/gora/mongodb/GoraMongodbTestDriver.java ---------------------------------------------------------------------- diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/GoraMongodbTestDriver.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/GoraMongodbTestDriver.java index aba167c..8cf65f5 100644 --- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/GoraMongodbTestDriver.java +++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/GoraMongodbTestDriver.java @@ -19,6 +19,7 @@ package org.apache.gora.mongodb; import org.apache.gora.GoraTestDriver; import org.apache.gora.mongodb.store.MongoStore; +import org.apache.gora.mongodb.store.MongoStoreParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,7 @@ public class GoraMongodbTestDriver extends GoraTestDriver { // Store Mongo server "host:port" in Hadoop configuration // so that MongoStore will be able to get it latter - conf.set(MongoStore.PROP_MONGO_SERVERS, "127.0.0.1:" + port); + conf.set(MongoStoreParameters.PROP_MONGO_SERVERS, "127.0.0.1:" + port); log.info("Starting embedded Mongodb server on {} port.", port); try {
