This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new b64cb3b [GOBBLIN-1105] some refactoring and make MysqlJobStatusStateStore implements DatasetStateStore b64cb3b is described below commit b64cb3b3442e07b2906dfabe75376e3382fc9457 Author: Arjun <ab...@linkedin.com> AuthorDate: Wed Apr 8 11:53:04 2020 -0700 [GOBBLIN-1105] some refactoring and make MysqlJobStatusStateStore implements DatasetStateStore Closes #2945 from arjun4084346/PR-3 --- .../apache/gobblin/cluster/HelixJobsMapping.java | 8 +-- .../gobblin/metastore/DatasetStoreDataset.java | 5 +- .../metastore/MysqlJobStatusStateStore.java | 78 +++++++++++++++++++--- ...a => MysqlJobStatusStateStoreEntryManager.java} | 38 ++++------- .../metastore/MysqlJobStatusStateStoreFactory.java | 9 ++- .../apache/gobblin/metastore/MysqlStateStore.java | 4 +- .../gobblin/metastore/MysqlStateStoreFactory.java | 2 +- .../metadata/DatasetStateStoreEntryManager.java | 4 +- .../gobblin/metastore/ZkStateStoreFactory.java | 1 - 9 files changed, 102 insertions(+), 47 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java index 8124ba1..11a90da 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java @@ -79,12 +79,8 @@ public class HelixJobsMapping { try { stateStoreFactory = resolver.resolveClass(stateStoreType).newInstance(); - } catch (ClassNotFoundException cnfe) { - throw new RuntimeException(cnfe); - } catch (InstantiationException ie) { - throw new RuntimeException(ie); - } catch (IllegalAccessException iae) { - throw new RuntimeException(iae); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); } String dbTableKey = ConfigUtils.getString(sysConfig, JOBS_MAPPING_DB_TABLE_KEY, DEFAULT_JOBS_MAPPING_DB_TABLE_KEY_NAME); diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java index 666cb56..45a1385 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java @@ -19,6 +19,8 @@ package org.apache.gobblin.metastore; import java.util.List; +import com.google.common.base.Strings; + import org.apache.gobblin.dataset.Dataset; import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; @@ -36,7 +38,8 @@ public class DatasetStoreDataset implements Dataset { @Override public String datasetURN() { - return this.key.getStoreName() + ":::" + this.key.getSanitizedDatasetUrn(); + return this.key.getStoreName() + + (Strings.isNullOrEmpty(this.key.getSanitizedDatasetUrn()) ? "" : ":::" + this.key.getSanitizedDatasetUrn()); } /** diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java index 8c2117b..aa73b37 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java @@ -18,14 +18,25 @@ package org.apache.gobblin.metastore; import java.io.IOException; -import java.util.Collections; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Collection; import java.util.List; -import java.util.Properties; +import java.util.Map; + +import com.google.common.collect.Lists; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.metastore.predicates.StoreNamePredicate; + @Slf4j /** @@ -33,14 +44,14 @@ import org.apache.gobblin.configuration.State; * * @param <T> state object type **/ -public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore { +public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore<T> implements DatasetStateStore<T> { /** * Manages the persistence and retrieval of {@link State} in a MySQL database * @param dataSource the {@link DataSource} object for connecting to MySQL * @param stateStoreTableName the table for storing the state in rows keyed by two levels (store_name, table_name) * @param compressedValues should values be compressed for storage? * @param stateClass class of the {@link State}s stored in this state store - * @throws IOException + * @throws IOException in case of failures */ public MysqlJobStatusStateStore(DataSource dataSource, String stateStoreTableName, boolean compressedValues, Class<T> stateClass) @@ -50,12 +61,63 @@ public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore { /** * Returns all the job statuses for a flow group, flow name, flow execution id - * @param storeName - * @param flowExecutionId - * @return - * @throws IOException + * @param storeName store name in the state store + * @param flowExecutionId Flow Execution Id + * @return list of states + * @throws IOException in case of failures */ public List<T> getAll(String storeName, long flowExecutionId) throws IOException { return getAll(storeName, flowExecutionId + "%", true); } + + @Override + public List<DatasetStateStoreEntryManager<T>> getMetadataForTables(StateStorePredicate predicate) + throws IOException { + List<DatasetStateStoreEntryManager<T>> entryManagers = Lists.newArrayList(); + + try (Connection connection = dataSource.getConnection(); + PreparedStatement queryStatement = connection.prepareStatement(SELECT_METADATA_SQL)) { + String storeName = predicate instanceof StoreNamePredicate ? ((StoreNamePredicate) predicate).getStoreName() : "%"; + queryStatement.setString(1, storeName); + + try (ResultSet rs = queryStatement.executeQuery()) { + while (rs.next()) { + String rsStoreName = rs.getString(1); + String rsTableName = rs.getString(2); + Timestamp timestamp = rs.getTimestamp(3); + + DatasetStateStoreEntryManager<T> entryManager = + new MysqlJobStatusStateStoreEntryManager<>(rsStoreName, rsTableName, timestamp.getTime(), this); + + if (predicate.apply(entryManager)) { + entryManagers.add(new MysqlJobStatusStateStoreEntryManager<T>(rsStoreName, rsTableName, timestamp.getTime(), this)); + } + } + } + } catch (SQLException e) { + throw new IOException("failure getting metadata for tables", e); + } + + return entryManagers; + } + + @Override + public Map<String, T> getLatestDatasetStatesByUrns(String jobName) { + throw new UnsupportedOperationException(); + } + + @Override + public T getLatestDatasetState(String storeName, String datasetUrn) { + throw new UnsupportedOperationException(); + } + + @Override + public void persistDatasetState(String datasetUrn, T datasetState) { + throw new UnsupportedOperationException(); + } + + @Override + public void persistDatasetURNs(String storeName, Collection<String> datasetUrns) { + throw new UnsupportedOperationException(); + } } diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreEntryManager.java similarity index 53% copy from gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java copy to gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreEntryManager.java index 666cb56..0ebe573 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreEntryManager.java @@ -17,40 +17,30 @@ package org.apache.gobblin.metastore; -import java.util.List; +import java.io.IOException; -import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; -import lombok.Data; - /** - * A {@link Dataset} representing a group of entries in a {@link DatasetStateStore} with the same dataset urn. + * A {@link DatasetStateStoreEntryManager} generated by {@link MysqlJobStatusStateStore}. */ -@Data -public class DatasetStoreDataset implements Dataset { +public class MysqlJobStatusStateStoreEntryManager<T extends State> extends DatasetStateStoreEntryManager<T> { + private final MysqlJobStatusStateStore<T> stateStore; - private final Key key; - private final List<DatasetStateStoreEntryManager> datasetStateStoreMetadataEntries; + public MysqlJobStatusStateStoreEntryManager(String storeName, String tableName, long timestamp, MysqlJobStatusStateStore<T> mysqlJobStatusStateStore) { + super(storeName, tableName, timestamp, "", "", mysqlJobStatusStateStore); + this.stateStore = mysqlJobStatusStateStore; + } @Override - public String datasetURN() { - return this.key.getStoreName() + ":::" + this.key.getSanitizedDatasetUrn(); + public T readState() throws IOException { + return this.stateStore.get(getStoreName(), getTableName(), ""); } - /** - * The key for a {@link DatasetStoreDataset}. - */ - @Data - public static class Key { - private final String storeName; - private final String sanitizedDatasetUrn; - - public Key(DatasetStateStoreEntryManager metadata) { - this.storeName = metadata.getStoreName(); - this.sanitizedDatasetUrn = metadata.getSanitizedDatasetUrn(); - } + @Override + public void delete() throws IOException { + this.stateStore.delete(getStoreName(), getTableName()); } - } diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java index f571046..5a2c684 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java @@ -27,7 +27,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.util.ConfigUtils; -public class MysqlJobStatusStateStoreFactory extends MysqlStateStoreFactory { +public class MysqlJobStatusStateStoreFactory extends MysqlStateStoreFactory implements DatasetStateStore.Factory { @Override public <T extends State> MysqlJobStatusStateStore<T> createStateStore(Config config, Class<T> stateClass) { String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, @@ -39,9 +39,14 @@ public class MysqlJobStatusStateStoreFactory extends MysqlStateStoreFactory { BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); - return new MysqlJobStatusStateStore(basicDataSource, stateStoreTableName, compressedValues, stateClass); + return new MysqlJobStatusStateStore<>(basicDataSource, stateStoreTableName, compressedValues, stateClass); } catch (Exception e) { throw new RuntimeException("Failed to create MysqlStateStore with factory", e); } } + + @Override + public <T extends State> MysqlJobStatusStateStore createStateStore(Config config) { + return createStateStore(config, State.class); + } } diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java index de84f52..e472ad1 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java @@ -77,7 +77,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { // Class of the state objects to be put into the store private final Class<T> stateClass; - private final DataSource dataSource; + protected final DataSource dataSource; private final boolean compressedValues; private static final String UPSERT_JOB_STATE_TEMPLATE = @@ -133,7 +133,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { private final String DELETE_JOB_STATE_SQL; private final String CLONE_JOB_STATE_SQL; private final String SELECT_STORE_NAMES_SQL; - private final String SELECT_METADATA_SQL; + protected final String SELECT_METADATA_SQL; /** * Manages the persistence and retrieval of {@link State} in a MySQL database diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java index 4af3730..fc9e362 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java @@ -39,7 +39,7 @@ public class MysqlStateStoreFactory implements StateStore.Factory { BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); - return new MysqlStateStore(basicDataSource, stateStoreTableName, compressedValues, stateClass); + return new MysqlStateStore<>(basicDataSource, stateStoreTableName, compressedValues, stateClass); } catch (Exception e) { throw new RuntimeException("Failed to create MysqlStateStore with factory", e); } diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java index 40cff79..392a4c3 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java @@ -43,12 +43,12 @@ public abstract class DatasetStateStoreEntryManager<T extends State> extends Sta private final DatasetStateStore datasetStateStore; public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp, - DatasetStateStore.TableNameParser tableNameParser, DatasetStateStore datasetStateStore) { + DatasetStateStore.TableNameParser tableNameParser, DatasetStateStore<T> datasetStateStore) { this(storeName, tableName, timestamp, tableNameParser.getSanitizedDatasetUrn(), tableNameParser.getStateId(), datasetStateStore); } public DatasetStateStoreEntryManager(String storeName, String tableName, long timestamp, String sanitizedDatasetUrn, - String stateId, DatasetStateStore datasetStateStore) { + String stateId, DatasetStateStore<T> datasetStateStore) { super(storeName, tableName, timestamp, datasetStateStore); this.sanitizedDatasetUrn = sanitizedDatasetUrn; this.stateId = stateId; diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java index 2b15efe..6e47f55 100644 --- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java +++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java @@ -21,7 +21,6 @@ import org.apache.gobblin.annotation.Alias; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.util.ConfigUtils; -import java.util.Properties; @Alias("zk")