This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b64cb3b  [GOBBLIN-1105] some refactoring and make 
MysqlJobStatusStateStore implements DatasetStateStore
b64cb3b is described below

commit b64cb3b3442e07b2906dfabe75376e3382fc9457
Author: Arjun <ab...@linkedin.com>
AuthorDate: Wed Apr 8 11:53:04 2020 -0700

    [GOBBLIN-1105] some refactoring and make MysqlJobStatusStateStore 
implements DatasetStateStore
    
    Closes #2945 from arjun4084346/PR-3
---
 .../apache/gobblin/cluster/HelixJobsMapping.java   |  8 +--
 .../gobblin/metastore/DatasetStoreDataset.java     |  5 +-
 .../metastore/MysqlJobStatusStateStore.java        | 78 +++++++++++++++++++---
 ...a => MysqlJobStatusStateStoreEntryManager.java} | 38 ++++-------
 .../metastore/MysqlJobStatusStateStoreFactory.java |  9 ++-
 .../apache/gobblin/metastore/MysqlStateStore.java  |  4 +-
 .../gobblin/metastore/MysqlStateStoreFactory.java  |  2 +-
 .../metadata/DatasetStateStoreEntryManager.java    |  4 +-
 .../gobblin/metastore/ZkStateStoreFactory.java     |  1 -
 9 files changed, 102 insertions(+), 47 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
index 8124ba1..11a90da 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java
@@ -79,12 +79,8 @@ public class HelixJobsMapping {
 
     try {
       stateStoreFactory = resolver.resolveClass(stateStoreType).newInstance();
-    } catch (ClassNotFoundException cnfe) {
-      throw new RuntimeException(cnfe);
-    } catch (InstantiationException ie) {
-      throw new RuntimeException(ie);
-    } catch (IllegalAccessException iae) {
-      throw new RuntimeException(iae);
+    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
+      throw new RuntimeException(e);
     }
 
     String dbTableKey = ConfigUtils.getString(sysConfig, 
JOBS_MAPPING_DB_TABLE_KEY, DEFAULT_JOBS_MAPPING_DB_TABLE_KEY_NAME);
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
index 666cb56..45a1385 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.metastore;
 
 import java.util.List;
 
+import com.google.common.base.Strings;
+
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
 
@@ -36,7 +38,8 @@ public class DatasetStoreDataset implements Dataset {
 
   @Override
   public String datasetURN() {
-    return this.key.getStoreName() + ":::" + this.key.getSanitizedDatasetUrn();
+    return this.key.getStoreName() +
+        (Strings.isNullOrEmpty(this.key.getSanitizedDatasetUrn()) ? "" : ":::" 
+ this.key.getSanitizedDatasetUrn());
   }
 
   /**
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
index 8c2117b..aa73b37 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStore.java
@@ -18,14 +18,25 @@
 package org.apache.gobblin.metastore;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Collection;
 import java.util.List;
-import java.util.Properties;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
 
 import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
+
 
 @Slf4j
 /**
@@ -33,14 +44,14 @@ import org.apache.gobblin.configuration.State;
  *
  * @param <T> state object type
  **/
-public class MysqlJobStatusStateStore<T extends State> extends MysqlStateStore 
{
+public class MysqlJobStatusStateStore<T extends State> extends 
MysqlStateStore<T> implements DatasetStateStore<T> {
   /**
    * Manages the persistence and retrieval of {@link State} in a MySQL database
    * @param dataSource the {@link DataSource} object for connecting to MySQL
    * @param stateStoreTableName the table for storing the state in rows keyed 
by two levels (store_name, table_name)
    * @param compressedValues should values be compressed for storage?
    * @param stateClass class of the {@link State}s stored in this state store
-   * @throws IOException
+   * @throws IOException in case of failures
    */
   public MysqlJobStatusStateStore(DataSource dataSource, String 
stateStoreTableName, boolean compressedValues,
       Class<T> stateClass)
@@ -50,12 +61,63 @@ public class MysqlJobStatusStateStore<T extends State> 
extends MysqlStateStore {
 
   /**
    * Returns all the job statuses for a flow group, flow name, flow execution 
id
-   * @param storeName
-   * @param flowExecutionId
-   * @return
-   * @throws IOException
+   * @param storeName store name in the state store
+   * @param flowExecutionId Flow Execution Id
+   * @return list of states
+   * @throws IOException in case of failures
    */
   public List<T> getAll(String storeName, long flowExecutionId) throws 
IOException {
     return getAll(storeName, flowExecutionId + "%", true);
   }
+
+  @Override
+  public List<DatasetStateStoreEntryManager<T>> 
getMetadataForTables(StateStorePredicate predicate)
+      throws IOException {
+    List<DatasetStateStoreEntryManager<T>> entryManagers = 
Lists.newArrayList();
+
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement queryStatement = 
connection.prepareStatement(SELECT_METADATA_SQL)) {
+      String storeName = predicate instanceof StoreNamePredicate ? 
((StoreNamePredicate) predicate).getStoreName() : "%";
+      queryStatement.setString(1, storeName);
+
+      try (ResultSet rs = queryStatement.executeQuery()) {
+        while (rs.next()) {
+          String rsStoreName = rs.getString(1);
+          String rsTableName = rs.getString(2);
+          Timestamp timestamp = rs.getTimestamp(3);
+
+          DatasetStateStoreEntryManager<T> entryManager =
+              new MysqlJobStatusStateStoreEntryManager<>(rsStoreName, 
rsTableName, timestamp.getTime(), this);
+
+          if (predicate.apply(entryManager)) {
+            entryManagers.add(new 
MysqlJobStatusStateStoreEntryManager<T>(rsStoreName, rsTableName, 
timestamp.getTime(), this));
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new IOException("failure getting metadata for tables", e);
+    }
+
+    return entryManagers;
+  }
+
+  @Override
+  public Map<String, T> getLatestDatasetStatesByUrns(String jobName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T getLatestDatasetState(String storeName, String datasetUrn) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void persistDatasetState(String datasetUrn, T datasetState) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void persistDatasetURNs(String storeName, Collection<String> 
datasetUrns) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreEntryManager.java
similarity index 53%
copy from 
gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
copy to 
gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreEntryManager.java
index 666cb56..0ebe573 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDataset.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreEntryManager.java
@@ -17,40 +17,30 @@
 
 package org.apache.gobblin.metastore;
 
-import java.util.List;
+import java.io.IOException;
 
-import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
 
-import lombok.Data;
-
 
 /**
- * A {@link Dataset} representing a group of entries in a {@link 
DatasetStateStore} with the same dataset urn.
+ * A {@link DatasetStateStoreEntryManager} generated by {@link 
MysqlJobStatusStateStore}.
  */
-@Data
-public class DatasetStoreDataset implements Dataset {
+public class MysqlJobStatusStateStoreEntryManager<T extends State> extends 
DatasetStateStoreEntryManager<T> {
+  private final MysqlJobStatusStateStore<T> stateStore;
 
-  private final Key key;
-  private final List<DatasetStateStoreEntryManager> 
datasetStateStoreMetadataEntries;
+  public MysqlJobStatusStateStoreEntryManager(String storeName, String 
tableName, long timestamp, MysqlJobStatusStateStore<T> 
mysqlJobStatusStateStore) {
+    super(storeName, tableName, timestamp, "", "", mysqlJobStatusStateStore);
+    this.stateStore = mysqlJobStatusStateStore;
+  }
 
   @Override
-  public String datasetURN() {
-    return this.key.getStoreName() + ":::" + this.key.getSanitizedDatasetUrn();
+  public T readState() throws IOException {
+    return this.stateStore.get(getStoreName(), getTableName(), "");
   }
 
-  /**
-   * The key for a {@link DatasetStoreDataset}.
-   */
-  @Data
-  public static class Key {
-    private final String storeName;
-    private final String sanitizedDatasetUrn;
-
-    public Key(DatasetStateStoreEntryManager metadata) {
-      this.storeName = metadata.getStoreName();
-      this.sanitizedDatasetUrn = metadata.getSanitizedDatasetUrn();
-    }
+  @Override
+  public void delete() throws IOException {
+    this.stateStore.delete(getStoreName(), getTableName());
   }
-
 }
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
index f571046..5a2c684 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlJobStatusStateStoreFactory.java
@@ -27,7 +27,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.ConfigUtils;
 
 
-public class MysqlJobStatusStateStoreFactory extends MysqlStateStoreFactory {
+public class MysqlJobStatusStateStoreFactory extends MysqlStateStoreFactory 
implements DatasetStateStore.Factory {
   @Override
   public <T extends State> MysqlJobStatusStateStore<T> createStateStore(Config 
config, Class<T> stateClass) {
     String stateStoreTableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
@@ -39,9 +39,14 @@ public class MysqlJobStatusStateStoreFactory extends 
MysqlStateStoreFactory {
       BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
           SharedResourcesBrokerFactory.getImplicitBroker());
 
-      return new MysqlJobStatusStateStore(basicDataSource, 
stateStoreTableName, compressedValues, stateClass);
+      return new MysqlJobStatusStateStore<>(basicDataSource, 
stateStoreTableName, compressedValues, stateClass);
     } catch (Exception e) {
       throw new RuntimeException("Failed to create MysqlStateStore with 
factory", e);
     }
   }
+
+  @Override
+  public <T extends State> MysqlJobStatusStateStore createStateStore(Config 
config) {
+    return createStateStore(config, State.class);
+  }
 }
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index de84f52..e472ad1 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -77,7 +77,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
 
   // Class of the state objects to be put into the store
   private final Class<T> stateClass;
-  private final DataSource dataSource;
+  protected final DataSource dataSource;
   private final boolean compressedValues;
 
   private static final String UPSERT_JOB_STATE_TEMPLATE =
@@ -133,7 +133,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
   private final String DELETE_JOB_STATE_SQL;
   private final String CLONE_JOB_STATE_SQL;
   private final String SELECT_STORE_NAMES_SQL;
-  private final String SELECT_METADATA_SQL;
+  protected final String SELECT_METADATA_SQL;
 
   /**
    * Manages the persistence and retrieval of {@link State} in a MySQL database
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
index 4af3730..fc9e362 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreFactory.java
@@ -39,7 +39,7 @@ public class MysqlStateStoreFactory implements 
StateStore.Factory {
       BasicDataSource basicDataSource = MysqlDataSourceFactory.get(config,
           SharedResourcesBrokerFactory.getImplicitBroker());
 
-      return new MysqlStateStore(basicDataSource, stateStoreTableName, 
compressedValues, stateClass);
+      return new MysqlStateStore<>(basicDataSource, stateStoreTableName, 
compressedValues, stateClass);
     } catch (Exception e) {
       throw new RuntimeException("Failed to create MysqlStateStore with 
factory", e);
     }
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
index 40cff79..392a4c3 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
@@ -43,12 +43,12 @@ public abstract class DatasetStateStoreEntryManager<T 
extends State> extends Sta
   private final DatasetStateStore datasetStateStore;
 
   public DatasetStateStoreEntryManager(String storeName, String tableName, 
long timestamp,
-      DatasetStateStore.TableNameParser tableNameParser, DatasetStateStore 
datasetStateStore) {
+      DatasetStateStore.TableNameParser tableNameParser, DatasetStateStore<T> 
datasetStateStore) {
     this(storeName, tableName, timestamp, 
tableNameParser.getSanitizedDatasetUrn(), tableNameParser.getStateId(), 
datasetStateStore);
   }
 
   public DatasetStateStoreEntryManager(String storeName, String tableName, 
long timestamp, String sanitizedDatasetUrn,
-      String stateId, DatasetStateStore datasetStateStore) {
+      String stateId, DatasetStateStore<T> datasetStateStore) {
     super(storeName, tableName, timestamp, datasetStateStore);
     this.sanitizedDatasetUrn = sanitizedDatasetUrn;
     this.stateId = stateId;
diff --git 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java
 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java
index 2b15efe..6e47f55 100644
--- 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java
+++ 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStoreFactory.java
@@ -21,7 +21,6 @@ import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.ConfigUtils;
-import java.util.Properties;
 
 
 @Alias("zk")

Reply via email to