This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 72c2e77998 Update minion task metadata ZNode path (#8959)
72c2e77998 is described below
commit 72c2e77998b19a358f9419c2d898764e0874f095
Author: Haitao Zhang <[email protected]>
AuthorDate: Tue Jun 28 16:11:58 2022 -0700
Update minion task metadata ZNode path (#8959)
* update minion task metadata path
* add test cases
* fix format issue
* rename methods
* fix logic error
* updated comments
* change parameter order
---
.../segmentpreselector/SegmentPreSelectorTest.java | 2 +-
.../pinot/common/metadata/ZKMetadataProvider.java | 32 ++---
.../pinot/common/minion/BaseTaskMetadata.java | 2 +-
.../common/minion/MergeRollupTaskMetadata.java | 2 +-
.../common/minion/MinionTaskMetadataUtils.java | 52 ++++++--
.../RealtimeToOfflineSegmentsTaskMetadata.java | 2 +-
.../common/utils/helix}/FakePropertyStore.java | 13 +-
.../common/minion/MinionTaskMetadataUtilsTest.java | 144 ++++++++++++++++++---
.../helix/core/minion/ClusterInfoAccessor.java | 3 +-
.../executor/MinionTaskZkMetadataManager.java | 5 +-
.../mergerollup/MergeRollupTaskGenerator.java | 2 +-
.../RealtimeToOfflineSegmentsTaskExecutor.java | 2 +-
.../RealtimeToOfflineSegmentsTaskGenerator.java | 2 +-
13 files changed, 211 insertions(+), 52 deletions(-)
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
index 545845cf22..44abd8053e 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
@@ -26,12 +26,12 @@ import java.util.Set;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ExternalView;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.broker.util.FakePropertyStore;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
+import org.apache.pinot.common.utils.helix.FakePropertyStore;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 0633e32672..05fcbf1e50 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -66,8 +66,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX =
"/MINION_TASK_METADATA";
public static void setUserConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String username, ZNRecord znRecord) {
- propertyStore
- .set(constructPropertyStorePathForUserConfig(username), znRecord,
AccessOption.PERSISTENT);
+ propertyStore.set(constructPropertyStorePathForUserConfig(username),
znRecord, AccessOption.PERSISTENT);
}
public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String realtimeTableName,
@@ -130,7 +129,13 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_SEGMENT_LINEAGE,
tableNameWithType);
}
- public static String constructPropertyStorePathForMinionTaskMetadata(String
taskType, String tableNameWithType) {
+ public static String constructPropertyStorePathForMinionTaskMetadata(String
tableNameWithType, String taskType) {
+ return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX,
tableNameWithType, taskType);
+ }
+
+ @Deprecated
+ public static String
constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType,
+ String tableNameWithType) {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX,
taskType, tableNameWithType);
}
@@ -156,8 +161,7 @@ public class ZKMetadataProvider {
}
}
- public static void
removeUserConfigFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore,
- String username) {
+ public static void
removeUserConfigFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore,
String username) {
String propertyStorePath =
constructPropertyStorePathForUserConfig(username);
if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
@@ -224,8 +228,8 @@ public class ZKMetadataProvider {
@Nullable
public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String username) {
- ZNRecord znRecord = propertyStore
- .get(constructPropertyStorePathForUserConfig(username), null,
AccessOption.PERSISTENT);
+ ZNRecord znRecord =
+ propertyStore.get(constructPropertyStorePathForUserConfig(username),
null, AccessOption.PERSISTENT);
if (znRecord == null) {
return null;
}
@@ -240,14 +244,13 @@ public class ZKMetadataProvider {
@Nullable
public static List<UserConfig>
getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
- List<ZNRecord> znRecordss = propertyStore
- .getChildren(PROPERTYSTORE_USER_CONFIGS_PREFIX, null,
AccessOption.PERSISTENT);
+ List<ZNRecord> znRecordss =
+ propertyStore.getChildren(PROPERTYSTORE_USER_CONFIGS_PREFIX, null,
AccessOption.PERSISTENT);
try {
- return Optional.ofNullable(znRecordss)
- .orElseGet(() -> {
- return new ArrayList<>();
-
}).stream().map(AccessControlUserConfigUtils::fromZNRecord).collect(Collectors.toList());
+ return Optional.ofNullable(znRecordss).orElseGet(() -> {
+ return new ArrayList<>();
+
}).stream().map(AccessControlUserConfigUtils::fromZNRecord).collect(Collectors.toList());
} catch (Exception e) {
LOGGER.error("Caught exception while getting user list configuration",
e);
return null;
@@ -256,8 +259,7 @@ public class ZKMetadataProvider {
@Nullable
public static List<String> getAllUserName(ZkHelixPropertyStore<ZNRecord>
propertyStore) {
- return propertyStore
- .getChildNames(PROPERTYSTORE_USER_CONFIGS_PREFIX,
AccessOption.PERSISTENT);
+ return propertyStore.getChildNames(PROPERTYSTORE_USER_CONFIGS_PREFIX,
AccessOption.PERSISTENT);
}
@Nullable
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
index 4e8998912b..3135c98544 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
@@ -27,7 +27,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
* Base abstract class for minion task metadata.
*
* This metadata gets serialized and stored in zookeeper under the path:
- * MINION_TASK_METADATA/${taskName}/${tableNameWithType}
+ * MINION_TASK_METADATA/${tableNameWithType}/${taskName}
*/
public abstract class BaseTaskMetadata {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
index 3ed10b786d..4484ace493 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
@@ -28,7 +28,7 @@ import org.apache.helix.ZNRecord;
* The <code>watermarkMap</code> denotes the time (exclusive) upto which tasks
have been executed for the bucket
* granularity.
*
- * This gets serialized and stored in zookeeper under the path
MINION_TASK_METADATA/MergeRollupTask/tableNameWithType
+ * This gets serialized and stored in zookeeper under the path
MINION_TASK_METADATA/${tableNameWithType}/MergeRollupTask
*/
public class MergeRollupTaskMetadata extends BaseTaskMetadata {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
index 5a1120218b..2213b5273f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
@@ -36,12 +36,24 @@ public final class MinionTaskMetadataUtils {
}
/**
- * Fetches the ZNRecord for the given minion task and tableName, from
MINION_TASK_METADATA/taskName/tableNameWthType
+ * Fetches the ZNRecord for the given minion task and tableName. Fetch from
the new path
+ * MINION_TASK_METADATA/${tableNameWthType}/{taskType} if it exists;
otherwise, fetch from the old path
+ * MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
*/
@Nullable
public static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord>
propertyStore, String taskType,
String tableNameWithType) {
- String path =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
tableNameWithType);
+ String newPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(tableNameWithType,
taskType);
+ if (propertyStore.exists(newPath, AccessOption.PERSISTENT)) {
+ return fetchTaskMetadata(propertyStore, newPath);
+ } else {
+ return fetchTaskMetadata(propertyStore,
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType,
tableNameWithType));
+ }
+ }
+
+ @Nullable
+ private static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord>
propertyStore, String path) {
Stat stat = new Stat();
ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
if (znRecord != null) {
@@ -51,29 +63,49 @@ public final class MinionTaskMetadataUtils {
}
/**
- * Deletes the ZNRecord for the given minion task and tableName, from
MINION_TASK_METADATA/taskName/tableNameWthType
+ * Deletes the ZNRecord for the given minion task and tableName, from both
the new path
+ * MINION_TASK_METADATA/${tableNameWthType}/${taskType} and the old path
+ * MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
*/
public static void deleteTaskMetadata(HelixPropertyStore<ZNRecord>
propertyStore, String taskType,
String tableNameWithType) {
- String path =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
tableNameWithType);
- if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
+ String newPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(tableNameWithType,
taskType);
+ String oldPath =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType,
tableNameWithType);
+ boolean newPathDeleted = propertyStore.remove(newPath,
AccessOption.PERSISTENT);
+ boolean oldPathDeleted = propertyStore.remove(oldPath,
AccessOption.PERSISTENT);
+ if (!newPathDeleted || !oldPathDeleted) {
throw new ZkException("Failed to delete task metadata: " + taskType + ",
" + tableNameWithType);
}
}
/**
- * Generic method for persisting {@link BaseTaskMetadata} to
MINION_TASK_METADATA. The metadata will be saved in the
- * ZNode under the path:
/MINION_TASK_METADATA/${taskType}/${tableNameWithType}
+ * Generic method for persisting {@link BaseTaskMetadata} to
MINION_TASK_METADATA. The metadata will
+ * be saved in the ZNode under the new path
/MINION_TASK_METADATA/${tableNameWithType}/${taskType} if
+ * the old path already exists; otherwise, it will be saved in the ZNode
under the old path
+ * /MINION_TASK_METADATA/${taskType}/${tableNameWithType}.
*
* Will fail if expectedVersion does not match.
* Set expectedVersion -1 to override version check.
*/
public static void persistTaskMetadata(HelixPropertyStore<ZNRecord>
propertyStore, String taskType,
BaseTaskMetadata taskMetadata, int expectedVersion) {
- String path =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
+ String newPath =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskMetadata.getTableNameWithType(),
+ taskType);
+ String oldPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType,
taskMetadata.getTableNameWithType());
- if (!propertyStore
- .set(path, taskMetadata.toZNRecord(), expectedVersion,
AccessOption.PERSISTENT)) {
+ if (propertyStore.exists(newPath, AccessOption.PERSISTENT) ||
!propertyStore.exists(oldPath,
+ AccessOption.PERSISTENT)) {
+ persistTaskMetadata(newPath, propertyStore, taskType, taskMetadata,
expectedVersion);
+ } else {
+ persistTaskMetadata(oldPath, propertyStore, taskType, taskMetadata,
expectedVersion);
+ }
+ }
+
+ private static void persistTaskMetadata(String path,
HelixPropertyStore<ZNRecord> propertyStore, String taskType,
+ BaseTaskMetadata taskMetadata, int expectedVersion) {
+ if (!propertyStore.set(path, taskMetadata.toZNRecord(), expectedVersion,
AccessOption.PERSISTENT)) {
throw new ZkException(
"Failed to persist minion metadata for task: " + taskType + " and
metadata: " + taskMetadata);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
index 1ce9e09c1b..6d9ba9e9d0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
@@ -26,7 +26,7 @@ import org.apache.helix.ZNRecord;
* The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks
have been executed.
*
* This gets serialized and stored in zookeeper under the path
- * MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
*
* PinotTaskGenerator:
* The <code>watermarkMs</code>> is used by the
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/util/FakePropertyStore.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
similarity index 89%
rename from
pinot-broker/src/test/java/org/apache/pinot/broker/util/FakePropertyStore.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
index fa3e9cafd2..362771b206 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/util/FakePropertyStore.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.broker.util;
+package org.apache.pinot.common.utils.helix;
import java.util.HashMap;
import java.util.Map;
@@ -40,6 +40,11 @@ public class FakePropertyStore extends
ZkHelixPropertyStore<ZNRecord> {
return _contents.get(path);
}
+ @Override
+ public boolean exists(String path, int options) {
+ return _contents.containsKey(path);
+ }
+
@Override
public void subscribeDataChanges(String path, IZkDataListener listener) {
_listener = listener;
@@ -65,6 +70,12 @@ public class FakePropertyStore extends
ZkHelixPropertyStore<ZNRecord> {
}
}
+ @Override
+ public boolean remove(String path, int options) {
+ _contents.remove(path);
+ return true;
+ }
+
public void setContents(String path, ZNRecord contents)
throws Exception {
_contents.put(path, contents);
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
index 74d247f875..cf5a613780 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
@@ -22,50 +22,162 @@ import org.I0Itec.zkclient.exception.ZkException;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.HelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.helix.FakePropertyStore;
+import org.apache.zookeeper.data.Stat;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
/**
* Tests for {@link MinionTaskMetadataUtils}
*/
public class MinionTaskMetadataUtilsTest {
+ private static final String TABLE_NAME_WITH_TYPE = "TestTable_OFFLINE";
+ private static final String TASK_TYPE = "TestTaskType";
+ private static final String NEW_MINION_METADATA_PATH =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(TABLE_NAME_WITH_TYPE,
TASK_TYPE);
+ private static final String OLD_MINION_METADATA_PATH =
+
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(TASK_TYPE,
TABLE_NAME_WITH_TYPE);
+ private static final DummyTaskMetadata NEW_TASK_METADATA = new
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 1000);
+ private static final DummyTaskMetadata OLD_TASK_METADATA = new
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 100);
+ private static final int EXPECTED_VERSION = -1;
+ private static final int ACCESS_OPTION = AccessOption.PERSISTENT;
+
+ @Test
+ public void testFetchTaskMetadata() {
+ // no metadata path exists
+ HelixPropertyStore<ZNRecord> propertyStore = new FakePropertyStore();
+ assertNull(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE));
+
+ // only the old metadata path exists
+ propertyStore = new FakePropertyStore();
+ propertyStore.set(OLD_MINION_METADATA_PATH,
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+ OLD_TASK_METADATA.toZNRecord());
+
+ // only the new metadata path exists
+ propertyStore = new FakePropertyStore();
+ propertyStore.set(NEW_MINION_METADATA_PATH,
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+ NEW_TASK_METADATA.toZNRecord());
+
+ // if two metadata paths exist at the same time, the new one will be used.
+ propertyStore = new FakePropertyStore();
+ propertyStore.set(OLD_MINION_METADATA_PATH,
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ propertyStore.set(NEW_MINION_METADATA_PATH,
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+ NEW_TASK_METADATA.toZNRecord());
+ }
+
+ @Test
+ public void testDeleteTaskMetadata() {
+ // no error
+ HelixPropertyStore<ZNRecord> propertyStore = new FakePropertyStore();
+ MinionTaskMetadataUtils.deleteTaskMetadata(propertyStore, TASK_TYPE,
TABLE_NAME_WITH_TYPE);
+
+ // both metadata paths will be removed
+ propertyStore = new FakePropertyStore();
+ propertyStore.set(OLD_MINION_METADATA_PATH,
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ propertyStore.set(NEW_MINION_METADATA_PATH,
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ assertTrue(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+ MinionTaskMetadataUtils.deleteTaskMetadata(propertyStore, TASK_TYPE,
TABLE_NAME_WITH_TYPE);
+ assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertFalse(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+ }
+
+ @Test
+ public void testDeleteTaskMetadataWithException() {
+ // Test happy path. No exceptions thrown.
+ HelixPropertyStore<ZNRecord> mockPropertyStore =
Mockito.mock(HelixPropertyStore.class);
+ when(mockPropertyStore.remove(ArgumentMatchers.anyString(),
ArgumentMatchers.anyInt())).thenReturn(true);
+ MinionTaskMetadataUtils.deleteTaskMetadata(mockPropertyStore, TASK_TYPE,
TABLE_NAME_WITH_TYPE);
+
+ // Test exception thrown
+ when(mockPropertyStore.remove(ArgumentMatchers.anyString(),
ArgumentMatchers.anyInt())).thenReturn(false);
+ try {
+ MinionTaskMetadataUtils.deleteTaskMetadata(mockPropertyStore, TASK_TYPE,
TABLE_NAME_WITH_TYPE);
+ fail("ZkException should have been thrown");
+ } catch (ZkException e) {
+ assertEquals(e.getMessage(), "Failed to delete task metadata:
TestTaskType, TestTable_OFFLINE");
+ }
+ }
@Test
public void testPersistTaskMetadata() {
- DummyTaskMetadata taskMetadata = new
DummyTaskMetadata("TestTable_OFFLINE", 1000);
+ DummyTaskMetadata taskMetadata = new
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 2000);
+
+ // the metadata will be written to the new path
+ HelixPropertyStore<ZNRecord> propertyStore = new FakePropertyStore();
+ MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE,
taskMetadata, EXPECTED_VERSION);
+ assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+ taskMetadata.toZNRecord());
+
+ // the metadata will be written to the old path if only the old path exists
+ propertyStore = new FakePropertyStore();
+ propertyStore.set(OLD_MINION_METADATA_PATH,
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE,
taskMetadata, EXPECTED_VERSION);
+ assertFalse(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertTrue(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+ taskMetadata.toZNRecord());
+
+ // the metadata will be written to the new path if only the new path exists
+ propertyStore = new FakePropertyStore();
+ propertyStore.set(NEW_MINION_METADATA_PATH,
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE,
taskMetadata, EXPECTED_VERSION);
+ assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+ taskMetadata.toZNRecord());
+
+ // the metadata will be written to the new path if both paths exist
+ propertyStore = new FakePropertyStore();
+ propertyStore.set(OLD_MINION_METADATA_PATH,
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ propertyStore.set(NEW_MINION_METADATA_PATH,
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+ MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE,
taskMetadata, EXPECTED_VERSION);
+ assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertTrue(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+ assertEquals(propertyStore.get(NEW_MINION_METADATA_PATH, new Stat(),
ACCESS_OPTION), taskMetadata.toZNRecord());
+ assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore,
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+ taskMetadata.toZNRecord());
+ }
+
+ @Test
+ public void testPersistTaskMetadataWithException() {
+ DummyTaskMetadata taskMetadata = new
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 1000);
HelixPropertyStore<ZNRecord> mockPropertyStore =
Mockito.mock(HelixPropertyStore.class);
- String expectedPath =
"/MINION_TASK_METADATA/TestTaskType/TestTable_OFFLINE";
- int expectedVersion = -1;
+ String expectedPath = NEW_MINION_METADATA_PATH;
// Test happy path. No exceptions thrown.
- when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(),
expectedVersion,
- AccessOption.PERSISTENT)).thenReturn(true);
- MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore,
"TestTaskType", taskMetadata, expectedVersion);
- verify(mockPropertyStore, times(1)).set(expectedPath,
taskMetadata.toZNRecord(), expectedVersion,
- AccessOption.PERSISTENT);
+ when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(),
EXPECTED_VERSION, ACCESS_OPTION)).thenReturn(
+ true);
+ MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore, TASK_TYPE,
taskMetadata, EXPECTED_VERSION);
+ verify(mockPropertyStore, times(1)).set(expectedPath,
taskMetadata.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
// Test exception thrown
- when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(), -1,
AccessOption.PERSISTENT))
- .thenReturn(false);
+ when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(),
EXPECTED_VERSION, ACCESS_OPTION)).thenReturn(
+ false);
try {
- MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore,
"TestTaskType", taskMetadata, expectedVersion);
+ MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore,
TASK_TYPE, taskMetadata, EXPECTED_VERSION);
fail("ZkException should have been thrown");
} catch (ZkException e) {
- verify(mockPropertyStore, times(2)).set(expectedPath,
taskMetadata.toZNRecord(), expectedVersion,
- AccessOption.PERSISTENT);
+ verify(mockPropertyStore, times(2)).set(expectedPath,
taskMetadata.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
assertEquals(e.getMessage(), "Failed to persist minion metadata for
task: TestTaskType and metadata:"
+ " {\"tableNameWithType\":\"TestTable_OFFLINE\"}");
}
}
- public class DummyTaskMetadata extends BaseTaskMetadata {
+ public static class DummyTaskMetadata extends BaseTaskMetadata {
private final String _tableNameWithType;
private final long _metadataVal;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 2ba65048b1..6bd1d608af 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -95,7 +95,8 @@ public class ClusterInfoAccessor {
}
/**
- * Fetches the ZNRecord under MINION_TASK_METADATA/${taskType} for the given
taskType and tableNameWithType
+ * Fetches the ZNRecord under
MINION_TASK_METADATA/${tableNameWithType}/${taskType} for the given
+ * taskType and tableNameWithType
*
* @param taskType The type of the minion task
* @param tableNameWithType Table name with type
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
index 524453c328..de13e486b3 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
@@ -37,7 +37,8 @@ public class MinionTaskZkMetadataManager {
}
/**
- * Fetch the ZNRecord under
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask for the given
tableNameWithType
+ * Fetch the ZNRecord under
MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask for
+ * the given tableNameWithType
*/
public ZNRecord getRealtimeToOfflineSegmentsTaskZNRecord(String
tableNameWithType) {
return MinionTaskMetadataUtils
@@ -47,7 +48,7 @@ public class MinionTaskZkMetadataManager {
/**
* Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into the ZNode at
- * MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask
+ * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
* for the corresponding tableNameWithType
* @param expectedVersion Version expected to be updating, failing the call
if there's a mismatch
*/
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 8b5b5baf3d..37986014bc 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -77,7 +77,7 @@ import org.slf4j.LoggerFactory;
* - Repeat until k time buckets get created or we loop through all the
candidate segments:
* - Calculate merge/roll-up bucket:
* - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode
found at
- * {@code MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>}
+ * MINION_TASK_METADATA/${tableNameWithType}/MergeRollupTask
* In case of cold-start, no ZNode will exist.
* A new ZNode will be created, with watermarkMs as the smallest time
found in all segments truncated to the
* closest bucket start time.
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
index 3fb2fc39d1..b1ee862189 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
* 5. Sort records if sorting is enabled in the table config
*
* Before beginning the task, the <code>watermarkMs</code> is checked in the
minion task metadata ZNode,
- * located at
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/<tableNameWithType>
+ * located at
MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
* It should match the <code>windowStartMs</code>.
* The version of the znode is cached.
*
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 76d8333672..5ab23b6408 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
*
* Steps:
* - The watermarkMs is read from the {@link
RealtimeToOfflineSegmentsTaskMetadata} ZNode
- * found at
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ * found at
MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
* In case of cold-start, no ZNode will exist.
* A new ZNode will be created, with watermarkMs as the smallest time found
in the COMPLETED segments
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]