This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 72c2e77998 Update minion task metadata ZNode path (#8959)
72c2e77998 is described below

commit 72c2e77998b19a358f9419c2d898764e0874f095
Author: Haitao Zhang <[email protected]>
AuthorDate: Tue Jun 28 16:11:58 2022 -0700

    Update minion task metadata ZNode path (#8959)
    
    * update minion task metadata path
    
    * add test cases
    
    * fix format issue
    
    * rename methods
    
    * fix logic error
    
    * updated comments
    
    * change parameter order
---
 .../segmentpreselector/SegmentPreSelectorTest.java |   2 +-
 .../pinot/common/metadata/ZKMetadataProvider.java  |  32 ++---
 .../pinot/common/minion/BaseTaskMetadata.java      |   2 +-
 .../common/minion/MergeRollupTaskMetadata.java     |   2 +-
 .../common/minion/MinionTaskMetadataUtils.java     |  52 ++++++--
 .../RealtimeToOfflineSegmentsTaskMetadata.java     |   2 +-
 .../common/utils/helix}/FakePropertyStore.java     |  13 +-
 .../common/minion/MinionTaskMetadataUtilsTest.java | 144 ++++++++++++++++++---
 .../helix/core/minion/ClusterInfoAccessor.java     |   3 +-
 .../executor/MinionTaskZkMetadataManager.java      |   5 +-
 .../mergerollup/MergeRollupTaskGenerator.java      |   2 +-
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |   2 +-
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |   2 +-
 13 files changed, 211 insertions(+), 52 deletions(-)

diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
index 545845cf22..44abd8053e 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
@@ -26,12 +26,12 @@ import java.util.Set;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.broker.util.FakePropertyStore;
 import org.apache.pinot.common.lineage.LineageEntry;
 import org.apache.pinot.common.lineage.LineageEntryState;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.lineage.SegmentLineageUtils;
+import org.apache.pinot.common.utils.helix.FakePropertyStore;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 0633e32672..05fcbf1e50 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -66,8 +66,7 @@ public class ZKMetadataProvider {
   private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = 
"/MINION_TASK_METADATA";
 
   public static void setUserConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String username, ZNRecord znRecord) {
-    propertyStore
-        .set(constructPropertyStorePathForUserConfig(username), znRecord, 
AccessOption.PERSISTENT);
+    propertyStore.set(constructPropertyStorePathForUserConfig(username), 
znRecord, AccessOption.PERSISTENT);
   }
 
   public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String realtimeTableName,
@@ -130,7 +129,13 @@ public class ZKMetadataProvider {
     return StringUtil.join("/", PROPERTYSTORE_SEGMENT_LINEAGE, 
tableNameWithType);
   }
 
-  public static String constructPropertyStorePathForMinionTaskMetadata(String 
taskType, String tableNameWithType) {
+  public static String constructPropertyStorePathForMinionTaskMetadata(String 
tableNameWithType, String taskType) {
+    return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, 
tableNameWithType, taskType);
+  }
+
+  @Deprecated
+  public static String 
constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType,
+      String tableNameWithType) {
     return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, 
taskType, tableNameWithType);
   }
 
@@ -156,8 +161,7 @@ public class ZKMetadataProvider {
     }
   }
 
-  public static void 
removeUserConfigFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore,
-      String username) {
+  public static void 
removeUserConfigFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore, 
String username) {
     String propertyStorePath = 
constructPropertyStorePathForUserConfig(username);
     if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
       propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
@@ -224,8 +228,8 @@ public class ZKMetadataProvider {
 
   @Nullable
   public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> 
propertyStore, String username) {
-    ZNRecord znRecord = propertyStore
-        .get(constructPropertyStorePathForUserConfig(username), null, 
AccessOption.PERSISTENT);
+    ZNRecord znRecord =
+        propertyStore.get(constructPropertyStorePathForUserConfig(username), 
null, AccessOption.PERSISTENT);
     if (znRecord == null) {
       return null;
     }
@@ -240,14 +244,13 @@ public class ZKMetadataProvider {
 
   @Nullable
   public static List<UserConfig> 
getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    List<ZNRecord> znRecordss = propertyStore
-        .getChildren(PROPERTYSTORE_USER_CONFIGS_PREFIX, null, 
AccessOption.PERSISTENT);
+    List<ZNRecord> znRecordss =
+        propertyStore.getChildren(PROPERTYSTORE_USER_CONFIGS_PREFIX, null, 
AccessOption.PERSISTENT);
 
     try {
-      return Optional.ofNullable(znRecordss)
-          .orElseGet(() -> {
-            return new ArrayList<>();
-          
}).stream().map(AccessControlUserConfigUtils::fromZNRecord).collect(Collectors.toList());
+      return Optional.ofNullable(znRecordss).orElseGet(() -> {
+        return new ArrayList<>();
+      
}).stream().map(AccessControlUserConfigUtils::fromZNRecord).collect(Collectors.toList());
     } catch (Exception e) {
       LOGGER.error("Caught exception while getting user list configuration", 
e);
       return null;
@@ -256,8 +259,7 @@ public class ZKMetadataProvider {
 
   @Nullable
   public static List<String> getAllUserName(ZkHelixPropertyStore<ZNRecord> 
propertyStore) {
-    return propertyStore
-        .getChildNames(PROPERTYSTORE_USER_CONFIGS_PREFIX, 
AccessOption.PERSISTENT);
+    return propertyStore.getChildNames(PROPERTYSTORE_USER_CONFIGS_PREFIX, 
AccessOption.PERSISTENT);
   }
 
   @Nullable
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
index 4e8998912b..3135c98544 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/BaseTaskMetadata.java
@@ -27,7 +27,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * Base abstract class for minion task metadata.
  *
  * This metadata gets serialized and stored in zookeeper under the path:
- * MINION_TASK_METADATA/${taskName}/${tableNameWithType}
+ * MINION_TASK_METADATA/${tableNameWithType}/${taskName}
  */
 public abstract class BaseTaskMetadata {
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
index 3ed10b786d..4484ace493 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MergeRollupTaskMetadata.java
@@ -28,7 +28,7 @@ import org.apache.helix.ZNRecord;
  * The <code>watermarkMap</code> denotes the time (exclusive) upto which tasks 
have been executed for the bucket
  * granularity.
  *
- * This gets serialized and stored in zookeeper under the path 
MINION_TASK_METADATA/MergeRollupTask/tableNameWithType
+ * This gets serialized and stored in zookeeper under the path 
MINION_TASK_METADATA/${tableNameWithType}/MergeRollupTask
  */
 public class MergeRollupTaskMetadata extends BaseTaskMetadata {
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
index 5a1120218b..2213b5273f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
@@ -36,12 +36,24 @@ public final class MinionTaskMetadataUtils {
   }
 
   /**
-   * Fetches the ZNRecord for the given minion task and tableName, from 
MINION_TASK_METADATA/taskName/tableNameWthType
+   * Fetches the ZNRecord for the given minion task and tableName. Fetch from 
the new path
+   * MINION_TASK_METADATA/${tableNameWthType}/{taskType} if it exists; 
otherwise, fetch from the old path
+   * MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
    */
   @Nullable
   public static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord> 
propertyStore, String taskType,
       String tableNameWithType) {
-    String path = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, 
tableNameWithType);
+    String newPath = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(tableNameWithType,
 taskType);
+    if (propertyStore.exists(newPath, AccessOption.PERSISTENT)) {
+      return fetchTaskMetadata(propertyStore, newPath);
+    } else {
+      return fetchTaskMetadata(propertyStore,
+          
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType,
 tableNameWithType));
+    }
+  }
+
+  @Nullable
+  private static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord> 
propertyStore, String path) {
     Stat stat = new Stat();
     ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
     if (znRecord != null) {
@@ -51,29 +63,49 @@ public final class MinionTaskMetadataUtils {
   }
 
   /**
-   * Deletes the ZNRecord for the given minion task and tableName, from 
MINION_TASK_METADATA/taskName/tableNameWthType
+   * Deletes the ZNRecord for the given minion task and tableName, from both 
the new path
+   * MINION_TASK_METADATA/${tableNameWthType}/${taskType} and the old path
+   * MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
    */
   public static void deleteTaskMetadata(HelixPropertyStore<ZNRecord> 
propertyStore, String taskType,
       String tableNameWithType) {
-    String path = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, 
tableNameWithType);
-    if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
+    String newPath = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(tableNameWithType,
 taskType);
+    String oldPath =
+        
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType,
 tableNameWithType);
+    boolean newPathDeleted = propertyStore.remove(newPath, 
AccessOption.PERSISTENT);
+    boolean oldPathDeleted = propertyStore.remove(oldPath, 
AccessOption.PERSISTENT);
+    if (!newPathDeleted || !oldPathDeleted) {
       throw new ZkException("Failed to delete task metadata: " + taskType + ", 
" + tableNameWithType);
     }
   }
 
   /**
-   * Generic method for persisting {@link BaseTaskMetadata} to 
MINION_TASK_METADATA. The metadata will be saved in the
-   * ZNode under the path: 
/MINION_TASK_METADATA/${taskType}/${tableNameWithType}
+   * Generic method for persisting {@link BaseTaskMetadata} to 
MINION_TASK_METADATA. The metadata will
+   * be saved in the ZNode under the new path 
/MINION_TASK_METADATA/${tableNameWithType}/${taskType} if
+   * the old path already exists; otherwise, it will be saved in the ZNode 
under the old path
+   * /MINION_TASK_METADATA/${taskType}/${tableNameWithType}.
    *
    * Will fail if expectedVersion does not match.
    * Set expectedVersion -1 to override version check.
    */
   public static void persistTaskMetadata(HelixPropertyStore<ZNRecord> 
propertyStore, String taskType,
       BaseTaskMetadata taskMetadata, int expectedVersion) {
-    String path = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
+    String newPath =
+        
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskMetadata.getTableNameWithType(),
+            taskType);
+    String oldPath = 
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType,
         taskMetadata.getTableNameWithType());
-    if (!propertyStore
-        .set(path, taskMetadata.toZNRecord(), expectedVersion, 
AccessOption.PERSISTENT)) {
+    if (propertyStore.exists(newPath, AccessOption.PERSISTENT) || 
!propertyStore.exists(oldPath,
+        AccessOption.PERSISTENT)) {
+      persistTaskMetadata(newPath, propertyStore, taskType, taskMetadata, 
expectedVersion);
+    } else {
+      persistTaskMetadata(oldPath, propertyStore, taskType, taskMetadata, 
expectedVersion);
+    }
+  }
+
+  private static void persistTaskMetadata(String path, 
HelixPropertyStore<ZNRecord> propertyStore, String taskType,
+      BaseTaskMetadata taskMetadata, int expectedVersion) {
+    if (!propertyStore.set(path, taskMetadata.toZNRecord(), expectedVersion, 
AccessOption.PERSISTENT)) {
       throw new ZkException(
           "Failed to persist minion metadata for task: " + taskType + " and 
metadata: " + taskMetadata);
     }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
index 1ce9e09c1b..6d9ba9e9d0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
@@ -26,7 +26,7 @@ import org.apache.helix.ZNRecord;
  * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks 
have been executed.
  *
  * This gets serialized and stored in zookeeper under the path
- * MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
  *
  * PinotTaskGenerator:
  * The <code>watermarkMs</code>> is used by the 
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/util/FakePropertyStore.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
similarity index 89%
rename from 
pinot-broker/src/test/java/org/apache/pinot/broker/util/FakePropertyStore.java
rename to 
pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
index fa3e9cafd2..362771b206 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/util/FakePropertyStore.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/FakePropertyStore.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.util;
+package org.apache.pinot.common.utils.helix;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -40,6 +40,11 @@ public class FakePropertyStore extends 
ZkHelixPropertyStore<ZNRecord> {
     return _contents.get(path);
   }
 
+  @Override
+  public boolean exists(String path, int options) {
+    return _contents.containsKey(path);
+  }
+
   @Override
   public void subscribeDataChanges(String path, IZkDataListener listener) {
     _listener = listener;
@@ -65,6 +70,12 @@ public class FakePropertyStore extends 
ZkHelixPropertyStore<ZNRecord> {
     }
   }
 
+  @Override
+  public boolean remove(String path, int options) {
+    _contents.remove(path);
+    return true;
+  }
+
   public void setContents(String path, ZNRecord contents)
       throws Exception {
     _contents.put(path, contents);
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
index 74d247f875..cf5a613780 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionTaskMetadataUtilsTest.java
@@ -22,50 +22,162 @@ import org.I0Itec.zkclient.exception.ZkException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.HelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.utils.helix.FakePropertyStore;
+import org.apache.zookeeper.data.Stat;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
 
 
 /**
  * Tests for {@link MinionTaskMetadataUtils}
  */
 public class MinionTaskMetadataUtilsTest {
+  private static final String TABLE_NAME_WITH_TYPE = "TestTable_OFFLINE";
+  private static final String TASK_TYPE = "TestTaskType";
+  private static final String NEW_MINION_METADATA_PATH =
+      
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(TABLE_NAME_WITH_TYPE,
 TASK_TYPE);
+  private static final String OLD_MINION_METADATA_PATH =
+      
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(TASK_TYPE,
 TABLE_NAME_WITH_TYPE);
+  private static final DummyTaskMetadata NEW_TASK_METADATA = new 
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 1000);
+  private static final DummyTaskMetadata OLD_TASK_METADATA = new 
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 100);
+  private static final int EXPECTED_VERSION = -1;
+  private static final int ACCESS_OPTION = AccessOption.PERSISTENT;
+
+  @Test
+  public void testFetchTaskMetadata() {
+    // no metadata path exists
+    HelixPropertyStore<ZNRecord> propertyStore = new FakePropertyStore();
+    assertNull(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE));
+
+    // only the old metadata path exists
+    propertyStore = new FakePropertyStore();
+    propertyStore.set(OLD_MINION_METADATA_PATH, 
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+        OLD_TASK_METADATA.toZNRecord());
+
+    // only the new metadata path exists
+    propertyStore = new FakePropertyStore();
+    propertyStore.set(NEW_MINION_METADATA_PATH, 
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+        NEW_TASK_METADATA.toZNRecord());
+
+    // if two metadata paths exist at the same time, the new one will be used.
+    propertyStore = new FakePropertyStore();
+    propertyStore.set(OLD_MINION_METADATA_PATH, 
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    propertyStore.set(NEW_MINION_METADATA_PATH, 
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+        NEW_TASK_METADATA.toZNRecord());
+  }
+
+  @Test
+  public void testDeleteTaskMetadata() {
+    // no error
+    HelixPropertyStore<ZNRecord> propertyStore = new FakePropertyStore();
+    MinionTaskMetadataUtils.deleteTaskMetadata(propertyStore, TASK_TYPE, 
TABLE_NAME_WITH_TYPE);
+
+    // both metadata paths will be removed
+    propertyStore = new FakePropertyStore();
+    propertyStore.set(OLD_MINION_METADATA_PATH, 
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    propertyStore.set(NEW_MINION_METADATA_PATH, 
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    assertTrue(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+    MinionTaskMetadataUtils.deleteTaskMetadata(propertyStore, TASK_TYPE, 
TABLE_NAME_WITH_TYPE);
+    assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertFalse(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+  }
+
+  @Test
+  public void testDeleteTaskMetadataWithException() {
+    // Test happy path. No exceptions thrown.
+    HelixPropertyStore<ZNRecord> mockPropertyStore = 
Mockito.mock(HelixPropertyStore.class);
+    when(mockPropertyStore.remove(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyInt())).thenReturn(true);
+    MinionTaskMetadataUtils.deleteTaskMetadata(mockPropertyStore, TASK_TYPE, 
TABLE_NAME_WITH_TYPE);
+
+    // Test exception thrown
+    when(mockPropertyStore.remove(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyInt())).thenReturn(false);
+    try {
+      MinionTaskMetadataUtils.deleteTaskMetadata(mockPropertyStore, TASK_TYPE, 
TABLE_NAME_WITH_TYPE);
+      fail("ZkException should have been thrown");
+    } catch (ZkException e) {
+      assertEquals(e.getMessage(), "Failed to delete task metadata: 
TestTaskType, TestTable_OFFLINE");
+    }
+  }
 
   @Test
   public void testPersistTaskMetadata() {
-    DummyTaskMetadata taskMetadata = new 
DummyTaskMetadata("TestTable_OFFLINE", 1000);
+    DummyTaskMetadata taskMetadata = new 
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 2000);
+
+    // the metadata will be written to the new path
+    HelixPropertyStore<ZNRecord> propertyStore = new FakePropertyStore();
+    MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE, 
taskMetadata, EXPECTED_VERSION);
+    assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+        taskMetadata.toZNRecord());
+
+    // the metadata will be written to the old path if only the old path exists
+    propertyStore = new FakePropertyStore();
+    propertyStore.set(OLD_MINION_METADATA_PATH, 
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE, 
taskMetadata, EXPECTED_VERSION);
+    assertFalse(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertTrue(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+        taskMetadata.toZNRecord());
+
+    // the metadata will be written to the new path if only the new path exists
+    propertyStore = new FakePropertyStore();
+    propertyStore.set(NEW_MINION_METADATA_PATH, 
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE, 
taskMetadata, EXPECTED_VERSION);
+    assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertFalse(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+        taskMetadata.toZNRecord());
+
+    // the metadata will be written to the new path if both paths exist
+    propertyStore = new FakePropertyStore();
+    propertyStore.set(OLD_MINION_METADATA_PATH, 
OLD_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    propertyStore.set(NEW_MINION_METADATA_PATH, 
NEW_TASK_METADATA.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
+    MinionTaskMetadataUtils.persistTaskMetadata(propertyStore, TASK_TYPE, 
taskMetadata, EXPECTED_VERSION);
+    assertTrue(propertyStore.exists(NEW_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertTrue(propertyStore.exists(OLD_MINION_METADATA_PATH, ACCESS_OPTION));
+    assertEquals(propertyStore.get(NEW_MINION_METADATA_PATH, new Stat(), 
ACCESS_OPTION), taskMetadata.toZNRecord());
+    assertEquals(MinionTaskMetadataUtils.fetchTaskMetadata(propertyStore, 
TASK_TYPE, TABLE_NAME_WITH_TYPE),
+        taskMetadata.toZNRecord());
+  }
+
+  @Test
+  public void testPersistTaskMetadataWithException() {
+    DummyTaskMetadata taskMetadata = new 
DummyTaskMetadata(TABLE_NAME_WITH_TYPE, 1000);
     HelixPropertyStore<ZNRecord> mockPropertyStore = 
Mockito.mock(HelixPropertyStore.class);
-    String expectedPath = 
"/MINION_TASK_METADATA/TestTaskType/TestTable_OFFLINE";
-    int expectedVersion = -1;
+    String expectedPath = NEW_MINION_METADATA_PATH;
 
     // Test happy path. No exceptions thrown.
-    when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(), 
expectedVersion,
-        AccessOption.PERSISTENT)).thenReturn(true);
-    MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore, 
"TestTaskType", taskMetadata, expectedVersion);
-    verify(mockPropertyStore, times(1)).set(expectedPath, 
taskMetadata.toZNRecord(), expectedVersion,
-        AccessOption.PERSISTENT);
+    when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(), 
EXPECTED_VERSION, ACCESS_OPTION)).thenReturn(
+        true);
+    MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore, TASK_TYPE, 
taskMetadata, EXPECTED_VERSION);
+    verify(mockPropertyStore, times(1)).set(expectedPath, 
taskMetadata.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
 
     // Test exception thrown
-    when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(), -1, 
AccessOption.PERSISTENT))
-        .thenReturn(false);
+    when(mockPropertyStore.set(expectedPath, taskMetadata.toZNRecord(), 
EXPECTED_VERSION, ACCESS_OPTION)).thenReturn(
+        false);
     try {
-      MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore, 
"TestTaskType", taskMetadata, expectedVersion);
+      MinionTaskMetadataUtils.persistTaskMetadata(mockPropertyStore, 
TASK_TYPE, taskMetadata, EXPECTED_VERSION);
       fail("ZkException should have been thrown");
     } catch (ZkException e) {
-      verify(mockPropertyStore, times(2)).set(expectedPath, 
taskMetadata.toZNRecord(), expectedVersion,
-          AccessOption.PERSISTENT);
+      verify(mockPropertyStore, times(2)).set(expectedPath, 
taskMetadata.toZNRecord(), EXPECTED_VERSION, ACCESS_OPTION);
       assertEquals(e.getMessage(), "Failed to persist minion metadata for 
task: TestTaskType and metadata:"
           + " {\"tableNameWithType\":\"TestTable_OFFLINE\"}");
     }
   }
 
-  public class DummyTaskMetadata extends BaseTaskMetadata {
+  public static class DummyTaskMetadata extends BaseTaskMetadata {
 
     private final String _tableNameWithType;
     private final long _metadataVal;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 2ba65048b1..6bd1d608af 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -95,7 +95,8 @@ public class ClusterInfoAccessor {
   }
 
   /**
-   * Fetches the ZNRecord under MINION_TASK_METADATA/${taskType} for the given 
taskType and tableNameWithType
+   * Fetches the ZNRecord under 
MINION_TASK_METADATA/${tableNameWithType}/${taskType} for the given
+   * taskType and tableNameWithType
    *
    * @param taskType The type of the minion task
    * @param tableNameWithType Table name with type
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
index 524453c328..de13e486b3 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
@@ -37,7 +37,8 @@ public class MinionTaskZkMetadataManager {
   }
 
   /**
-   * Fetch the ZNRecord under 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask for the given 
tableNameWithType
+   * Fetch the ZNRecord under 
MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask for
+   * the given tableNameWithType
    */
   public ZNRecord getRealtimeToOfflineSegmentsTaskZNRecord(String 
tableNameWithType) {
     return MinionTaskMetadataUtils
@@ -47,7 +48,7 @@ public class MinionTaskZkMetadataManager {
 
   /**
    * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into the ZNode at
-   * MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask
+   * MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
    * for the corresponding tableNameWithType
    * @param expectedVersion Version expected to be updating, failing the call 
if there's a mismatch
    */
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 8b5b5baf3d..37986014bc 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -77,7 +77,7 @@ import org.slf4j.LoggerFactory;
  *    - Repeat until k time buckets get created or we loop through all the 
candidate segments:
  *      - Calculate merge/roll-up bucket:
  *        - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode 
found at
- *          {@code MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>}
+ *          MINION_TASK_METADATA/${tableNameWithType}/MergeRollupTask
  *          In case of cold-start, no ZNode will exist.
  *          A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
  *          closest bucket start time.
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
index 3fb2fc39d1..b1ee862189 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * 5. Sort records if sorting is enabled in the table config
  *
  * Before beginning the task, the <code>watermarkMs</code> is checked in the 
minion task metadata ZNode,
- * located at 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/<tableNameWithType>
+ * located at 
MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
  * It should match the <code>windowStartMs</code>.
  * The version of the znode is cached.
  *
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 76d8333672..5ab23b6408 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
  *
  * Steps:
  *  - The watermarkMs is read from the {@link 
RealtimeToOfflineSegmentsTaskMetadata} ZNode
- *  found at 
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ *  found at 
MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
  *  In case of cold-start, no ZNode will exist.
  *  A new ZNode will be created, with watermarkMs as the smallest time found 
in the COMPLETED segments
  *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to