This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0a49cdb7d [GOBBLIN-1787] Ability to delete multiple watermarks in a
state store (#3645)
0a49cdb7d is described below
commit 0a49cdb7db62171a1abcfdbb1934a24477807812
Author: Matthew Ho <[email protected]>
AuthorDate: Fri Feb 17 16:37:27 2023 -0800
[GOBBLIN-1787] Ability to delete multiple watermarks in a state store
(#3645)
* [GOBBLIN-1787] Ability to delete watermarks in a state store
* Remove magic number used in non-create calls
---
.../org/apache/gobblin/metastore/StateStore.java | 14 +++++++
.../runtime/StateStoreBasedWatermarkStorage.java | 9 ++++-
.../org/apache/gobblin/metastore/ZkStateStore.java | 47 ++++++++++++++--------
.../gobblin/runtime/ZkDatasetStateStoreTest.java | 10 ++---
4 files changed, 57 insertions(+), 23 deletions(-)
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
index 0a4cf4350..67db300a4 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java
@@ -205,6 +205,20 @@ public interface StateStore<T extends State> {
public void delete(String storeName, String tableName)
throws IOException;
+ /**
+ * Delete a list of tables from a store.
+ *
+ * @param storeName store name
+ * @param tableNames List of table names in the state store to delete
+ * @throws IOException
+ */
+ default void delete(String storeName, List<String> tableNames)
+ throws IOException {
+ for (String tableName : tableNames) {
+ delete(storeName, tableName);
+ }
+ }
+
/**
* Delete a store.
*
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index deb93ea1f..d471a6889 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -55,10 +56,10 @@ public class StateStoreBasedWatermarkStorage implements
WatermarkStorage {
* A watermark prefix that is compatible with different watermark storage
implementations.
* As such, this prefix should not include any characters disallowed in a
{@link java.net.URI}.
*/
- private static final String WATERMARK_STORAGE_PREFIX="streamingWatermarks_";
+ protected static final String
WATERMARK_STORAGE_PREFIX="streamingWatermarks_";
public final StateStore<CheckpointableWatermarkState> _stateStore;
- private final String _storeName;
+ protected final String _storeName;
/**
* A private method that creates a state store config
@@ -142,4 +143,8 @@ public class StateStoreBasedWatermarkStorage implements
WatermarkStorage {
return _stateStore.getAll(_storeName);
}
+ public void deleteWatermarks(List<String> tableNames) throws IOException {
+ _stateStore.delete(_storeName, tableNames);
+ }
+
}
diff --git
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index ea321a364..1c0b9ea09 100644
---
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -65,6 +66,14 @@ import
org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
* @param <T> state object type
**/
public class ZkStateStore<T extends State> implements StateStore<T> {
+ /**
+ * Corresponds to {@link AccessOption}, which defines behavior for accessing
znodes (get, remove, exists). The value 0 means not to
+ * throws exceptions if the znode does not exist (i.e. do not enable {@link
AccessOption#THROW_EXCEPTION_IFNOTEXIST}
+ *
+ * Note: This variable is not be used for create calls like {@link
HelixPropertyStore#create(String, Object, int)}
+ * which require specifying if the znode is {@link AccessOption#PERSISTENT},
{@link AccessOption#EPHEMERAL}, etc.
+ **/
+ private static final int DEFAULT_OPTION = 0;
// Class of the state objects to be put into the store
private final Class<T> stateClass;
@@ -101,7 +110,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
public boolean create(String storeName) throws IOException {
String path = formPath(storeName);
- return propStore.exists(path, 0) || propStore.create(path,
ArrayUtils.EMPTY_BYTE_ARRAY,
+ return propStore.exists(path, DEFAULT_OPTION) || propStore.create(path,
ArrayUtils.EMPTY_BYTE_ARRAY,
AccessOption.PERSISTENT);
}
@@ -109,7 +118,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
public boolean create(String storeName, String tableName) throws IOException
{
String path = formPath(storeName, tableName);
- if (propStore.exists(path, 0)) {
+ if (propStore.exists(path, DEFAULT_OPTION)) {
throw new IOException(String.format("State already exists for storeName
%s tableName %s", storeName,
tableName));
}
@@ -121,7 +130,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
public boolean exists(String storeName, String tableName) throws IOException
{
String path = formPath(storeName, tableName);
- return propStore.exists(path, 0);
+ return propStore.exists(path, DEFAULT_OPTION);
}
/**
@@ -145,7 +154,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
private void putData(String storeName, String tableName, byte[] data) throws
IOException {
String path = formPath(storeName, tableName);
- if (!propStore.exists(path, 0)) {
+ if (!propStore.exists(path, DEFAULT_OPTION)) {
// create with data
if (!propStore.create(path, data, AccessOption.PERSISTENT)) {
throw new IOException("Failed to create a state file for table " +
tableName);
@@ -179,7 +188,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
@Override
public T get(String storeName, String tableName, String stateId) throws
IOException {
String path = formPath(storeName, tableName);
- byte[] data = propStore.get(path, null, 0);
+ byte[] data = propStore.get(path, null, DEFAULT_OPTION);
List<T> states = Lists.newArrayList();
deserialize(data, states, stateId);
@@ -187,7 +196,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
if (states.isEmpty()) {
return null;
} else {
- return states.get(0);
+ return states.get(DEFAULT_OPTION);
}
}
@@ -203,7 +212,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
String path = formPath(storeName);
byte[] data;
- List<String> children = propStore.getChildNames(path, 0);
+ List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);
if (children == null) {
return Collections.emptyList();
@@ -211,7 +220,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
for (String c : children) {
if (predicate.apply(c)) {
- data = propStore.get(path + "/" + c, null, 0);
+ data = propStore.get(path + "/" + c, null, DEFAULT_OPTION);
deserialize(data, states);
}
}
@@ -223,7 +232,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
public List<T> getAll(String storeName, String tableName) throws IOException
{
List<T> states = Lists.newArrayList();
String path = formPath(storeName, tableName);
- byte[] data = propStore.get(path, null, 0);
+ byte[] data = propStore.get(path, null, DEFAULT_OPTION);
deserialize(data, states);
@@ -240,7 +249,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
List<String> names = Lists.newArrayList();
String path = formPath(storeName);
- List<String> children = propStore.getChildNames(path, 0);
+ List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);
if (children != null) {
for (String c : children) {
@@ -265,7 +274,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
List<String> names = Lists.newArrayList();
String path = formPath("");
- List<String> children = propStore.getChildNames(path, 0);
+ List<String> children = propStore.getChildNames(path, DEFAULT_OPTION);
if (children != null) {
for (String c : children) {
@@ -283,23 +292,29 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
String pathOriginal = formPath(storeName, original);
byte[] data;
- if (!propStore.exists(pathOriginal, 0)) {
+ if (!propStore.exists(pathOriginal, DEFAULT_OPTION)) {
throw new IOException(String.format("State does not exist for table %s",
original));
}
- data = propStore.get(pathOriginal, null, 0);
+ data = propStore.get(pathOriginal, null, DEFAULT_OPTION);
putData(storeName, alias, data);
}
@Override
public void delete(String storeName, String tableName) throws IOException {
- propStore.remove(formPath(storeName, tableName), 0);
+ propStore.remove(formPath(storeName, tableName), DEFAULT_OPTION);
+ }
+
+ @Override
+ public void delete(String storeName, List<String> tableNames) throws
IOException {
+ List<String> paths = tableNames.stream().map(table -> formPath(storeName,
table)).collect(Collectors.toList());
+ propStore.remove(paths, DEFAULT_OPTION);
}
@Override
public void delete(String storeName) throws IOException {
- propStore.remove(formPath(storeName), 0);
+ propStore.remove(formPath(storeName), DEFAULT_OPTION);
}
/**
@@ -348,4 +363,4 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
private void deserialize(byte[] data, List<T> states) throws IOException {
deserialize(data, states, null);
}
-}
\ No newline at end of file
+}
diff --git
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 71e5cbc92..0ab4faf32 100644
---
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -261,14 +261,14 @@ public class ZkDatasetStateStoreTest {
@Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")
public void testDeleteDatasetJobState() throws IOException {
- JobState.DatasetState datasetState = zkDatasetStateStore.get(TEST_JOB_NAME,
- TEST_DATASET_URN + "-" +
zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
- zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_DATASET_URN);
+ String tableName = TEST_DATASET_URN + "-" +
zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
+ zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX;
+ JobState.DatasetState datasetState =
zkDatasetStateStore.get(TEST_JOB_NAME, tableName, TEST_DATASET_URN);
Assert.assertNotNull(datasetState);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
- zkDatasetStateStore.delete(TEST_JOB_NAME);
+ zkDatasetStateStore.delete(TEST_JOB_NAME,
Collections.singletonList(tableName));
datasetState = zkDatasetStateStore.get(TEST_JOB_NAME,
TEST_DATASET_URN + "-" +
zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
@@ -288,4 +288,4 @@ public class ZkDatasetStateStoreTest {
testingServer.close();
}
}
-}
\ No newline at end of file
+}