Repository: incubator-gobblin Updated Branches: refs/heads/master 1c5fb6ec4 -> 60adccfd9
[GOBBLIN-432] Share the DataSource used by the MySQL state stores Closes #2311 from htran1/mysql_state_store_share_data_source Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/60adccfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/60adccfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/60adccfd Branch: refs/heads/master Commit: 60adccfd96a33d19cfac8af6e26dd837b8156f89 Parents: 1c5fb6e Author: Hung Tran <[email protected]> Authored: Wed Mar 21 17:57:27 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Mar 21 17:57:27 2018 -0700 ---------------------------------------------------------------------- .../metastore/MysqlDataSourceFactory.java | 85 ++++++++++++++++++ .../gobblin/metastore/MysqlDataSourceKey.java | 71 +++++++++++++++ .../gobblin/metastore/MysqlStateStore.java | 14 +++ .../metastore/MysqlStateStoreFactory.java | 10 ++- .../metastore/MysqlDataSourceFactoryTest.java | 93 ++++++++++++++++++++ .../runtime/MysqlDatasetStateStoreFactory.java | 12 ++- 6 files changed, 278 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java new file mode 100644 index 0000000..45bd045 --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore; + +import java.io.IOException; + +import org.apache.commons.dbcp.BasicDataSource; + +import com.typesafe.config.Config; + +import org.apache.gobblin.broker.ResourceInstance; +import org.apache.gobblin.broker.iface.ConfigView; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.ScopeType; +import org.apache.gobblin.broker.iface.ScopedConfigView; +import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; + +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link SharedResourceFactory} for creating {@link BasicDataSource}s. + * + * The factory creates a {@link BasicDataSource} with the config. + */ +@Slf4j +public class MysqlDataSourceFactory<S extends ScopeType<S>> + implements SharedResourceFactory<BasicDataSource, MysqlDataSourceKey, S> { + + public static final String FACTORY_NAME = "basicDataSource"; + + /** + * Get a {@link BasicDataSource} based on the config + * @param config configuration + * @param broker broker + * @return a {@link BasicDataSource} + * @throws IOException + */ + public static <S extends ScopeType<S>> BasicDataSource get(Config config, + SharedResourcesBroker<S> broker) throws IOException { + try { + return broker.getSharedResource(new MysqlDataSourceFactory<S>(), + new MysqlDataSourceKey(MysqlStateStore.getDataSourceId(config), config)); + } catch (NotConfiguredException nce) { + throw new IOException(nce); + } + } + + @Override + public String getName() { + return FACTORY_NAME; + } + + @Override + public SharedResourceFactoryResponse<BasicDataSource> createResource(SharedResourcesBroker<S> broker, + ScopedConfigView<S, MysqlDataSourceKey> config) throws NotConfiguredException { + MysqlDataSourceKey key = config.getKey(); + Config configuration = key.getConfig(); + + BasicDataSource dataSource = MysqlStateStore.newDataSource(configuration); + + return new ResourceInstance<>(dataSource); + } + + @Override + public S getAutoScope(SharedResourcesBroker<S> broker, ConfigView<S, MysqlDataSourceKey> config) { + return broker.selfScope().getType().rootScope(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java new file mode 100644 index 0000000..bccab28 --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlDataSourceKey.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore; + +import org.apache.commons.dbcp.BasicDataSource; + +import com.typesafe.config.Config; + +import org.apache.gobblin.broker.iface.SharedResourceKey; + +import lombok.Getter; + +/** + * {@link SharedResourceKey} for requesting {@link BasicDataSource}s from a + * {@link org.apache.gobblin.broker.iface.SharedResourceFactory} + */ +@Getter +public class MysqlDataSourceKey implements SharedResourceKey { + private final String dataSourceName; + private final Config config; + + /** + * Constructs a key for the mysql data source. The dataSourceName is used as the key. + * @param dataSourceName an identifier for the data source + * @param config configuration that is passed along to configure the data source + */ + public MysqlDataSourceKey(String dataSourceName, Config config) { + this.dataSourceName = dataSourceName; + this.config = config; + } + + @Override + public String toConfigurationKey() { + return this.dataSourceName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MysqlDataSourceKey that = (MysqlDataSourceKey) o; + + return dataSourceName == null ? + that.dataSourceName == null : dataSourceName.equals(that.dataSourceName); + } + + @Override + public int hashCode() { + return dataSourceName != null ? dataSourceName.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java index b276702..d5ae6ff 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java @@ -176,6 +176,20 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { return basicDataSource; } + /** + * return an identifier for the data source based on the configuration + * @param config configuration + * @return a {@link String} to identify the data source + */ + public static String getDataSourceId(Config config) { + PasswordManager passwordManager = PasswordManager.getInstance(ConfigUtils.configToProperties(config)); + + return ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_JDBC_DRIVER_KEY, + ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER) + "::" + + config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY) + "::" + + passwordManager.readPassword(config.getString(ConfigurationKeys.STATE_STORE_DB_USER_KEY)); + } + @Override public boolean create(String storeName) throws IOException { /* nothing to do since state will be stored as a new row in a DB table that has been validated */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java index aad441a..4af3730 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java @@ -16,25 +16,29 @@ */ package org.apache.gobblin.metastore; +import org.apache.commons.dbcp.BasicDataSource; + import com.typesafe.config.Config; + import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.util.ConfigUtils; -import java.util.Properties; -import org.apache.commons.dbcp.BasicDataSource; @Alias("mysql") public class MysqlStateStoreFactory implements StateStore.Factory { @Override public <T extends State> StateStore<T> createStateStore(Config config, Class<T> stateClass) { - BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config); String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE); boolean compressedValues = ConfigUtils.getBoolean(config, ConfigurationKeys.STATE_STORE_COMPRESSED_VALUES_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES); try { + BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config, + SharedResourcesBrokerFactory.getImplicitBroker()); + return new MysqlStateStore(basicDataSource, stateStoreTableName, compressedValues, stateClass); } catch (Exception e) { throw new RuntimeException("Failed to create MysqlStateStore with factory", e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java new file mode 100644 index 0000000..a11a4af --- /dev/null +++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/MysqlDataSourceFactoryTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.metastore; + +import java.io.IOException; + +import org.apache.commons.dbcp.BasicDataSource; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; + +/** + * Unit tests for {@link MysqlDataSourceFactory}. + */ +@Test(groups = { "gobblin.metastore" }) +public class MysqlDataSourceFactoryTest { + + @Test + public void testSameKey() throws IOException { + + Config config = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url", + ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user", + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd")); + + BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config, + SharedResourcesBrokerFactory.getImplicitBroker()); + + BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config, + SharedResourcesBrokerFactory.getImplicitBroker()); + + Assert.assertEquals(basicDataSource1, basicDataSource2); + } + + @Test + public void testDifferentKey() throws IOException { + + Config config1 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1", + ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user", + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd")); + + Config config2 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url2", + ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user", + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd")); + + BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config1, + SharedResourcesBrokerFactory.getImplicitBroker()); + + BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config2, + SharedResourcesBrokerFactory.getImplicitBroker()); + + Assert.assertNotEquals(basicDataSource1, basicDataSource2); + } + + @Test + public void testSameDbDifferentUser() throws IOException { + + Config config1 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1", + ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user1", + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd")); + + Config config2 = ConfigFactory.parseMap(ImmutableMap.of(ConfigurationKeys.STATE_STORE_DB_URL_KEY, "url1", + ConfigurationKeys.STATE_STORE_DB_USER_KEY, "user2", + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "dummypwd")); + + BasicDataSource basicDataSource1 = MysqlDataSourceFactory.get(config1, + SharedResourcesBrokerFactory.getImplicitBroker()); + + BasicDataSource basicDataSource2 = MysqlDataSourceFactory.get(config2, + SharedResourcesBrokerFactory.getImplicitBroker()); + + Assert.assertNotEquals(basicDataSource1, basicDataSource2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60adccfd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java index fea2a62..8d5b291 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreFactory.java @@ -16,19 +16,20 @@ */ package org.apache.gobblin.runtime; +import org.apache.commons.dbcp.BasicDataSource; + import com.typesafe.config.Config; + import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.DatasetStateStore; -import org.apache.gobblin.metastore.MysqlStateStore; -import java.util.Properties; -import org.apache.commons.dbcp.BasicDataSource; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; @Alias("mysql") public class MysqlDatasetStateStoreFactory implements DatasetStateStore.Factory { @Override public DatasetStateStore<JobState.DatasetState> createStateStore(Config config) { - BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config); String stateStoreTableName = config.hasPath(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY) ? config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY) : ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE; @@ -37,6 +38,9 @@ public class MysqlDatasetStateStoreFactory implements DatasetStateStore.Factory ConfigurationKeys.DEFAULT_STATE_STORE_COMPRESSED_VALUES; try { + BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config, + SharedResourcesBrokerFactory.getImplicitBroker()); + return new MysqlDatasetStateStore(basicDataSource, stateStoreTableName, compressedValues); } catch (Exception e) { throw new RuntimeException("Failed to create MysqlDatasetStateStore with factory", e);
