This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a49cdb7d [GOBBLIN-1787] Ability to delete multiple watermarks in a 
state store (#3645)
0a49cdb7d is described below

commit 0a49cdb7db62171a1abcfdbb1934a24477807812
Author: Matthew Ho <[email protected]>
AuthorDate: Fri Feb 17 16:37:27 2023 -0800

    [GOBBLIN-1787] Ability to delete multiple watermarks in a state store 
(#3645)
    
    * [GOBBLIN-1787] Ability to delete watermarks in a state store
    
    * Remove magic number used in non-create calls
---
 .../org/apache/gobblin/metastore/StateStore.java   | 14 +++++++
 .../runtime/StateStoreBasedWatermarkStorage.java   |  9 ++++-
 .../org/apache/gobblin/metastore/ZkStateStore.java | 47 ++++++++++++++--------
 .../gobblin/runtime/ZkDatasetStateStoreTest.java   | 10 ++---
 4 files changed, 57 insertions(+), 23 deletions(-)

diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 0a4cf4350..67db300a4 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -205,6 +205,20 @@ public interface StateStore<T extends State> {
   public void delete(String storeName, String tableName)
       throws IOException;
 
+  /**
+   * Delete a list of tables from a store.
+   *
+   * @param storeName store name
+   * @param tableNames List of table names in the state store to delete
+   * @throws IOException
+   */
+  default void delete(String storeName, List<String> tableNames)
+      throws IOException {
+    for (String tableName : tableNames) {
+      delete(storeName, tableName);
+    }
+  }
+
   /**
    * Delete a store.
    *
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index deb93ea1f..d471a6889 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -55,10 +56,10 @@ public class StateStoreBasedWatermarkStorage implements 
WatermarkStorage {
    * A watermark prefix that is compatible with different watermark storage 
implementations.
    * As such, this prefix should not include any characters disallowed in a 
{@link java.net.URI}.
    */
-  private static final String WATERMARK_STORAGE_PREFIX="streamingWatermarks_";
+  protected static final String 
WATERMARK_STORAGE_PREFIX="streamingWatermarks_";
 
   public final StateStore<CheckpointableWatermarkState> _stateStore;
-  private final String _storeName;
+  protected final String _storeName;
 
   /**
    * A private method that creates a state store config
@@ -142,4 +143,8 @@ public class StateStoreBasedWatermarkStorage implements 
WatermarkStorage {
     return _stateStore.getAll(_storeName);
   }
 
+  public void deleteWatermarks(List<String> tableNames) throws IOException {
+    _stateStore.delete(_storeName, tableNames);
+  }
+
 }
diff --git 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index ea321a364..1c0b9ea09 100644
--- 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++ 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -65,6 +66,14 @@ import 
org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
  * @param <T> state object type
  **/
 public class ZkStateStore<T extends State> implements StateStore<T> {
+  /**
+   * Corresponds to {@link AccessOption}, which defines behavior for accessing 
znodes (get, remove, exists). The value 0 means not to
+   * throws exceptions if the znode does not exist (i.e. do not enable {@link 
AccessOption#THROW_EXCEPTION_IFNOTEXIST}
+   *
+   * Note: This variable is not be used for create calls like {@link 
HelixPropertyStore#create(String, Object, int)}
+   * which require specifying if the znode is {@link AccessOption#PERSISTENT}, 
{@link AccessOption#EPHEMERAL}, etc.
+   **/
+  private static final int DEFAULT_OPTION = 0;
 
   // Class of the state objects to be put into the store
   private final Class<T> stateClass;
@@ -101,7 +110,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
   public boolean create(String storeName) throws IOException {
     String path = formPath(storeName);
 
-    return propStore.exists(path, 0) || propStore.create(path, 
ArrayUtils.EMPTY_BYTE_ARRAY,
+    return propStore.exists(path, DEFAULT_OPTION) || propStore.create(path, 
ArrayUtils.EMPTY_BYTE_ARRAY,
         AccessOption.PERSISTENT);
   }
 
@@ -109,7 +118,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
   public boolean create(String storeName, String tableName) throws IOException 
{
     String path = formPath(storeName, tableName);
 
-    if (propStore.exists(path, 0)) {
+    if (propStore.exists(path, DEFAULT_OPTION)) {
       throw new IOException(String.format("State already exists for storeName 
%s tableName %s", storeName,
           tableName));
     }
@@ -121,7 +130,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
   public boolean exists(String storeName, String tableName) throws IOException 
{
     String path = formPath(storeName, tableName);
 
-    return propStore.exists(path, 0);
+    return propStore.exists(path, DEFAULT_OPTION);
   }
 
   /**
@@ -145,7 +154,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
   private void putData(String storeName, String tableName, byte[] data) throws 
IOException {
     String path = formPath(storeName, tableName);
 
-    if (!propStore.exists(path, 0)) {
+    if (!propStore.exists(path, DEFAULT_OPTION)) {
       // create with data
       if (!propStore.create(path, data, AccessOption.PERSISTENT)) {
         throw new IOException("Failed to create a state file for table " + 
tableName);
@@ -179,7 +188,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
   @Override
   public T get(String storeName, String tableName, String stateId) throws 
IOException {
     String path = formPath(storeName, tableName);
-    byte[] data = propStore.get(path, null, 0);
+    byte[] data = propStore.get(path, null, DEFAULT_OPTION);
     List<T> states = Lists.newArrayList();
 
     deserialize(data, states, stateId);
@@ -187,7 +196,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
     if (states.isEmpty()) {
       return null;
     } else {
-      return states.get(0);
+      return states.get(DEFAULT_OPTION);
     }
   }
 
@@ -203,7 +212,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
     String path = formPath(storeName);
     byte[] data;
 
-    List<String> children = propStore.getChildNames(path, 0);
+    List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);
 
     if (children == null) {
       return Collections.emptyList();
@@ -211,7 +220,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
 
     for (String c : children) {
       if (predicate.apply(c)) {
-        data = propStore.get(path + "/" + c, null, 0);
+        data = propStore.get(path + "/" + c, null, DEFAULT_OPTION);
         deserialize(data, states);
       }
     }
@@ -223,7 +232,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
   public List<T> getAll(String storeName, String tableName) throws IOException 
{
     List<T> states = Lists.newArrayList();
     String path = formPath(storeName, tableName);
-    byte[] data = propStore.get(path, null, 0);
+    byte[] data = propStore.get(path, null, DEFAULT_OPTION);
 
     deserialize(data, states);
 
@@ -240,7 +249,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
     List<String> names = Lists.newArrayList();
     String path = formPath(storeName);
 
-    List<String> children = propStore.getChildNames(path, 0);
+    List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);
 
     if (children != null) {
       for (String c : children) {
@@ -265,7 +274,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
     List<String> names = Lists.newArrayList();
     String path = formPath("");
 
-    List<String> children = propStore.getChildNames(path, 0);
+    List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);
 
     if (children != null) {
       for (String c : children) {
@@ -283,23 +292,29 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
     String pathOriginal = formPath(storeName, original);
     byte[] data;
 
-    if (!propStore.exists(pathOriginal, 0)) {
+    if (!propStore.exists(pathOriginal, DEFAULT_OPTION)) {
       throw new IOException(String.format("State does not exist for table %s", 
original));
     }
 
-    data = propStore.get(pathOriginal, null, 0);
+    data = propStore.get(pathOriginal, null, DEFAULT_OPTION);
 
     putData(storeName, alias, data);
   }
 
   @Override
   public void delete(String storeName, String tableName) throws IOException {
-    propStore.remove(formPath(storeName, tableName), 0);
+    propStore.remove(formPath(storeName, tableName), DEFAULT_OPTION);
+  }
+
+  @Override
+  public void delete(String storeName, List<String> tableNames) throws 
IOException {
+    List<String> paths = tableNames.stream().map(table -> formPath(storeName, 
table)).collect(Collectors.toList());
+    propStore.remove(paths, DEFAULT_OPTION);
   }
 
   @Override
   public void delete(String storeName) throws IOException {
-    propStore.remove(formPath(storeName), 0);
+    propStore.remove(formPath(storeName), DEFAULT_OPTION);
   }
 
   /**
@@ -348,4 +363,4 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
   private void deserialize(byte[] data, List<T> states) throws IOException {
     deserialize(data, states, null);
   }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
 
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 71e5cbc92..0ab4faf32 100644
--- 
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++ 
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -261,14 +261,14 @@ public class ZkDatasetStateStoreTest {
 
   @Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")
   public void testDeleteDatasetJobState() throws IOException {
-    JobState.DatasetState datasetState = zkDatasetStateStore.get(TEST_JOB_NAME,
-        TEST_DATASET_URN + "-" + 
zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
-            zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, 
TEST_DATASET_URN);
+    String tableName = TEST_DATASET_URN + "-" + 
zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
+        zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX;
+    JobState.DatasetState datasetState = 
zkDatasetStateStore.get(TEST_JOB_NAME, tableName, TEST_DATASET_URN);
 
     Assert.assertNotNull(datasetState);
     Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
 
-    zkDatasetStateStore.delete(TEST_JOB_NAME);
+    zkDatasetStateStore.delete(TEST_JOB_NAME, 
Collections.singletonList(tableName));
 
     datasetState = zkDatasetStateStore.get(TEST_JOB_NAME,
         TEST_DATASET_URN + "-" + 
zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
@@ -288,4 +288,4 @@ public class ZkDatasetStateStoreTest {
       testingServer.close();
     }
   }
-}
\ No newline at end of file
+}

Reply via email to