This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3219189 Segment merge lineage data structure (#3543)
3219189 is described below
commit 321918900edb1bb2a4741c2963a5efad1b5c83a8
Author: Seunghyun Lee <[email protected]>
AuthorDate: Mon Dec 3 19:18:14 2018 -0800
Segment merge lineage data structure (#3543)
* Segment merge lineage data structure
1. Added segment merge lineage that is a wrapper class of ZNRecord
2. Added segment merge group that will be used by broker during segment
selection process
3. Added a unit test
* Addressing comments and added more descriptions
---
.../pinot/common/lineage/SegmentGroup.java | 72 +++++
.../pinot/common/lineage/SegmentMergeLineage.java | 348 +++++++++++++++++++++
.../lineage/SegmentMergeLineageAccessHelper.java | 85 +++++
.../pinot/common/metadata/ZKMetadataProvider.java | 5 +
.../common/lineage/SegmentMergeLineageTest.java | 142 +++++++++
5 files changed, 652 insertions(+)
diff --git
a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
new file mode 100644
index 0000000..fb28ac1
--- /dev/null
+++
b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentGroup.java
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Class to represent segment group
+ */
+public class SegmentGroup {
+
+ private String _groupId;
+ private int _groupLevel;
+ private SegmentGroup _parentGroup;
+ private List<SegmentGroup> _childrenGroups;
+ private Set<String> _segments;
+
+ public String getGroupId() {
+ return _groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ _groupId = groupId;
+ }
+
+ public SegmentGroup getParentGroup() {
+ return _parentGroup;
+ }
+
+ public void setParentGroup(SegmentGroup parentGroup) {
+ _parentGroup = parentGroup;
+ }
+
+ public List<SegmentGroup> getChildrenGroups() {
+ return _childrenGroups;
+ }
+
+ public void setChildrenGroups(List<SegmentGroup> childrenGroups) {
+ _childrenGroups = childrenGroups;
+ }
+
+ public Set<String> getSegments() {
+ return _segments;
+ }
+
+ public void setSegments(Set<String> segments) {
+ _segments = segments;
+ }
+
+ public int getGroupLevel() {
+ return _groupLevel;
+ }
+
+ public void setGroupLevel(int groupLevel) {
+ _groupLevel = groupLevel;
+ }
+}
diff --git
a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
new file mode 100644
index 0000000..3df8aa9
--- /dev/null
+++
b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineage.java
@@ -0,0 +1,348 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import com.linkedin.pinot.common.utils.EqualityUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to represent segment merge lineage information.
+ *
+ * Segment merge lineage information is serialized into a znode and stored in
a helix property store (zookeeper). This
+ * information will be used by the broker, segment merge task generator, and
retention manager.
+ *
+ * For each segment group, we are storing the following information:
+ * 1. group id
+ * - group identifier (will be stored in time based uuid format)
+ * 2. group level
+ * - segment level allows us to have a hierarchical representation of the
segment lineage. When we assign the merge
+ * task, we will only merge/roll-up segments with the same level.
+ * (e.g. If hourly segment groups are in level 0, daily segment groups
will belong to level 1)
+ * 3. segments
+ * - segments that belong to a particular segment group
+ * 4. lineage information
+ * - If a segment group is created by merging multiple children segment
groups, we write the lineage information
+ * (e.g. segment group C is merged from segment group A, B)
+ *
+ *
+ * Example)
+ * Let's say that we have 3 segments to begin with (S1, S2, S3). For original
segments, we treat them specially by
+ * regarding each segment as a group. So, our lineage info will have the
following data.
+ *
+ * _parentGroupToChildrenGroupsMap = empty
+ * _levelToGroupToSegmentsMap = { level_0 -> { G1 -> S1, G2 -> S2, G3 -> S3} }
+ *
+ * Let's say that a merge task merges S1, S2, S3 into S4, S5. Then, we now
have a new group G4 (S4,S5). The lineage
+ * now would be the following:
+ *
+ * _parentGroupToChildrenGroupsMap = { G4 -> { G1, G2, G3} }
+ * _levelToGroupToSegmentsMap = { level_0 -> { G1 -> S1, G2 -> S2, G3 -> S3}
+ * level_1 -> { G4 -> S4, S5} }
+ *
+ */
+public class SegmentMergeLineage {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentMergeLineage.class);
+
+ private static final String LEVEL_KEY_PREFIX = "level_";
+ private static final String ROOT_NODE_GROUP_ID = "root";
+ private static final String SEGMENT_DELIMITER = ",";
+ private static final int DEFAULT_GROUP_LEVEL = 0;
+
+ private String _tableNameWithType;
+
+ // Mapping of group id to children group ids
+ private Map<String, List<String>> _parentGroupToChildrenGroupsMap;
+
+ // Mapping of group level to group id to segments that belong to a group
+ private Map<Integer, Map<String, List<String>>> _levelToGroupToSegmentsMap;
+
+ public SegmentMergeLineage(String tableNameWithType) {
+ _tableNameWithType = tableNameWithType;
+ _parentGroupToChildrenGroupsMap = new HashMap<>();
+ _levelToGroupToSegmentsMap = new HashMap<>();
+ }
+
+ public SegmentMergeLineage(String tableNameWithType, Map<String,
List<String>> segmentGroupLineageMap,
+ Map<Integer, Map<String, List<String>>> levelToGroupToSegmentMap) {
+ _tableNameWithType = tableNameWithType;
+ _parentGroupToChildrenGroupsMap = segmentGroupLineageMap;
+ _levelToGroupToSegmentsMap = levelToGroupToSegmentMap;
+ }
+
+ public String getTableName() {
+ return _tableNameWithType;
+ }
+
+ public static SegmentMergeLineage fromZNRecord(ZNRecord record) {
+ String tableNameWithType = record.getId();
+ Map<String, List<String>> segmentGroupLineageMap = record.getListFields();
+
+ Map<Integer, Map<String, List<String>>> groupToSegmentsMap = new
HashMap<>();
+ for (Map.Entry<String, Map<String, String>> entry :
record.getMapFields().entrySet()) {
+ String levelKey = entry.getKey();
+ Integer level =
Integer.parseInt(levelKey.substring(LEVEL_KEY_PREFIX.length()));
+ Map<String, List<String>> groupToSegmentsForLevel = new HashMap<>();
+ for (Map.Entry<String, String> groupEntry : entry.getValue().entrySet())
{
+ String groupId = groupEntry.getKey();
+ String segmentsString = groupEntry.getValue();
+ List<String> segments =
Arrays.asList(segmentsString.split(SEGMENT_DELIMITER));
+ groupToSegmentsForLevel.put(groupId, new ArrayList<>(segments));
+ }
+ groupToSegmentsMap.put(level, groupToSegmentsForLevel);
+ }
+ return new SegmentMergeLineage(tableNameWithType, segmentGroupLineageMap,
groupToSegmentsMap);
+ }
+
+ public ZNRecord toZNRecord() {
+ ZNRecord record = new ZNRecord(_tableNameWithType);
+ record.setListFields(_parentGroupToChildrenGroupsMap);
+ Map<String, Map<String, String>> groupToSegmentsMap = new HashMap<>();
+
+ for (Map.Entry<Integer, Map<String, List<String>>> entry :
_levelToGroupToSegmentsMap.entrySet()) {
+ String key = LEVEL_KEY_PREFIX + entry.getKey();
+ Map<String, String> groupSegmentsForLevel = new HashMap<>();
+ for (Map.Entry<String, List<String>> groupEntry :
entry.getValue().entrySet()) {
+ String groupId = groupEntry.getKey();
+ String segments = String.join(SEGMENT_DELIMITER,
groupEntry.getValue());
+ groupSegmentsForLevel.put(groupId, segments);
+ }
+ groupToSegmentsMap.put(key, groupSegmentsForLevel);
+ }
+ record.setMapFields(groupToSegmentsMap);
+
+ return record;
+ }
+
+ /**
+ * Add segment merge lineage information
+ *
+ * @param groupId a group id
+ * @param currentGroupSegments a list of segments that belongs to the group
+ * @param childrenGroups a list of children groups that the current group
covers. All children group ids has to be
+ * from the same group level.
+ */
+ public void addSegmentGroup(String groupId, List<String>
currentGroupSegments, List<String> childrenGroups)
+ throws InvalidConfigException {
+ // Get group level
+ Integer groupLevel = getGroupLevel(childrenGroups);
+
+ Map<String, List<String>> groupToSegmentMap =
+ _levelToGroupToSegmentsMap.computeIfAbsent(groupLevel, k -> new
HashMap<>());
+ if (groupToSegmentMap.containsKey(groupId) ||
_parentGroupToChildrenGroupsMap.containsKey(groupId)) {
+ throw new InvalidConfigException("Group id : " + groupId + " already
exists for table " + _tableNameWithType);
+ }
+
+ // Update group to segments map
+ groupToSegmentMap.put(groupId, new ArrayList<>(currentGroupSegments));
+ _levelToGroupToSegmentsMap.put(groupLevel, groupToSegmentMap);
+
+ // Update segment group lineage map
+ if (groupLevel > DEFAULT_GROUP_LEVEL) {
+ _parentGroupToChildrenGroupsMap.put(groupId, new
ArrayList<>(childrenGroups));
+ }
+
+ LOGGER.info("New group has been added successfully to the segment lineage.
(tableName: {}, groupId: {}, "
+ + "currentGroupSegments: {}, childrenGroups: {}",
_tableNameWithType, groupId, currentGroupSegments,
+ childrenGroups);
+ }
+
+ /**
+ * Remove segment merge information given a group id
+ *
+ * @param groupId a group id
+ */
+ public void removeSegmentGroup(String groupId) {
+ // Clean up the group id from parent to children group mapping
+ _parentGroupToChildrenGroupsMap.remove(groupId);
+ for (List<String> childrenGroups :
_parentGroupToChildrenGroupsMap.values()) {
+ childrenGroups.remove(groupId);
+ }
+
+ // Clean up the group id from group to segments mapping
+ for (Map<String, List<String>> groupToSegments :
_levelToGroupToSegmentsMap.values()) {
+ groupToSegments.remove(groupId);
+ }
+
+ LOGGER.info("Group {} has been successfully removed for table {}.",
groupId, _tableNameWithType);
+ }
+
+ /**
+ * Construct a lineage tree and returns the root node
+ *
+ * @return a root node for lineage tree
+ */
+ public SegmentGroup getMergeLineageRootSegmentGroup() {
+ // Create group nodes
+ Map<String, SegmentGroup> groupNodes = new HashMap<>();
+ for (Map.Entry<Integer, Map<String, List<String>>> groupEntryForLevel :
_levelToGroupToSegmentsMap.entrySet()) {
+ Integer level = groupEntryForLevel.getKey();
+ Map<String, List<String>> groupToSegmentsForLevel =
groupEntryForLevel.getValue();
+ for (Map.Entry<String, List<String>> entry :
groupToSegmentsForLevel.entrySet()) {
+ String groupId = entry.getKey();
+ List<String> segments = entry.getValue();
+ SegmentGroup groupNode = new SegmentGroup();
+ groupNode.setGroupId(groupId);
+ groupNode.setSegments(new HashSet<>(segments));
+ groupNode.setGroupLevel(level);
+ groupNodes.put(groupId, groupNode);
+ }
+ }
+
+ // Add edges by updating children & parent group information
+ for (Map.Entry<String, List<String>> lineageEntry :
_parentGroupToChildrenGroupsMap.entrySet()) {
+ String parentGroupId = lineageEntry.getKey();
+ List<String> childrenGroupIds = lineageEntry.getValue();
+ List<SegmentGroup> childrenGroups = new ArrayList<>();
+ SegmentGroup parentNode = groupNodes.get(parentGroupId);
+ for (String groupId : childrenGroupIds) {
+ SegmentGroup childNode = groupNodes.get(groupId);
+ if (childNode != null) {
+ childrenGroups.add(childNode);
+ childNode.setParentGroup(parentNode);
+ }
+ }
+ parentNode.setChildrenGroups(childrenGroups);
+ }
+
+ // Create a root node
+ SegmentGroup root = new SegmentGroup();
+ root.setGroupId(ROOT_NODE_GROUP_ID);
+ List<SegmentGroup> childrenForRoot = new ArrayList<>();
+ for (SegmentGroup group : groupNodes.values()) {
+ if (group.getParentGroup() == null) {
+ group.setParentGroup(root);
+ childrenForRoot.add(group);
+ }
+ }
+ root.setChildrenGroups(childrenForRoot);
+
+ return root;
+ }
+
+ /**
+ * Get a list of segments for a given group id
+ *
+ * @param groupId a group id
+ * @return a list of segments that belongs to the given group id, null if
the group does not exist
+ */
+ public List<String> getSegmentsForGroup(String groupId) {
+ for (Map<String, List<String>> groupToSegmentMap :
_levelToGroupToSegmentsMap.values()) {
+ List<String> segments = groupToSegmentMap.get(groupId);
+ if (segments != null) {
+ return segments;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get a list of children group ids for a given group id
+ *
+ * @param groupId a group id
+ * @return a list of children groups that are covered by the given group id,
null if the group does not exist
+ */
+ public List<String> getChildrenForGroup(String groupId) {
+ return _parentGroupToChildrenGroupsMap.get(groupId);
+ }
+
+ /**
+ * Get a list of all group levels
+ *
+ * @return a list of all group levels
+ */
+ public List<Integer> getAllGroupLevels() {
+ List<Integer> groupLevels = new
ArrayList<>(_levelToGroupToSegmentsMap.keySet());
+ Collections.sort(groupLevels);
+ return groupLevels;
+ }
+
+ /**
+ * Get a list of group ids for a given group level
+ *
+ * @param groupLevel a group level
+ * @return a list of group ids that belongs to the given group level, null
if the group level does not exist
+ */
+ public List<String> getGroupIdsForGroupLevel(int groupLevel) {
+ Map<String, List<String>> groupToSegmentsMap =
_levelToGroupToSegmentsMap.get(groupLevel);
+ if (groupToSegmentsMap != null) {
+ return new ArrayList<>(groupToSegmentsMap.keySet());
+ }
+ return null;
+ }
+
+ /**
+ * Helper function to compute group level given children groups
+ *
+ * @param childrenGroups a list of children group ids
+ * @return group level
+ */
+ private Integer getGroupLevel(List<String> childrenGroups) throws
InvalidConfigException {
+ // If no children exists, the group belongs to the base level.
+ if (childrenGroups == null || childrenGroups.isEmpty()) {
+ return DEFAULT_GROUP_LEVEL;
+ }
+
+ for (Map.Entry<Integer, Map<String, List<String>>> entry :
_levelToGroupToSegmentsMap.entrySet()) {
+ Integer currentLevel = entry.getKey();
+ Map<String, List<String>> currentLevelGroupToSegmentsMap =
entry.getValue();
+ if (currentLevelGroupToSegmentsMap.keySet().containsAll(childrenGroups))
{
+ return currentLevel + 1;
+ }
+ }
+
+ // At this point, not all children groups are covered, cannot add group
+ throw new InvalidConfigException("Cannot compute group level because not
all children groups exist "
+ + "in the segment merge lineage, table name: " + _tableNameWithType +
", children groups: " + childrenGroups
+ + "table");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (EqualityUtils.isSameReference(this, o)) {
+ return true;
+ }
+
+ if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+ return false;
+ }
+
+ SegmentMergeLineage that = (SegmentMergeLineage) o;
+
+ return EqualityUtils.isEqual(_tableNameWithType, that._tableNameWithType)
&& EqualityUtils.isEqual(
+ _parentGroupToChildrenGroupsMap, that._parentGroupToChildrenGroupsMap)
&& EqualityUtils.isEqual(
+ _levelToGroupToSegmentsMap, that._levelToGroupToSegmentsMap);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = EqualityUtils.hashCodeOf(_tableNameWithType);
+ result = EqualityUtils.hashCodeOf(result, _parentGroupToChildrenGroupsMap);
+ result = EqualityUtils.hashCodeOf(result, _levelToGroupToSegmentsMap);
+ return result;
+ }
+}
diff --git
a/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
new file mode 100644
index 0000000..848d7b8
--- /dev/null
+++
b/pinot-common/src/main/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageAccessHelper.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
+import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.common.utils.retry.RetryPolicy;
+import java.util.List;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Class to help to read, write segment merge lineage
+ */
+public class SegmentMergeLineageAccessHelper {
+
+ /**
+ * Read the segment merge lineage ZNRecord from the property store
+ *
+ * @param propertyStore a property store
+ * @param tableNameWithType a table name with type
+ * @return a ZNRecord of segment merge lineage, return null if znode does
not exist
+ */
+ public static ZNRecord
getSegmentMergeLineageZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String tableNameWithType) {
+ String path =
ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ Stat stat = new Stat();
+ ZNRecord segmentMergeLineageZNRecord = propertyStore.get(path, stat,
AccessOption.PERSISTENT);
+ if (segmentMergeLineageZNRecord != null) {
+ segmentMergeLineageZNRecord.setVersion(stat.getVersion());
+ }
+ return segmentMergeLineageZNRecord;
+ }
+
+ /**
+ * Read the segment merge lineage from the property store
+ *
+ * @param propertyStore a property store
+ * @param tableNameWithType a table name with type
+ * @return a segment merge lineage, return null if znode does not exist
+ */
+ public static SegmentMergeLineage
getSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ String tableNameWithType) {
+ ZNRecord znRecord = getSegmentMergeLineageZNRecord(propertyStore,
tableNameWithType);
+ SegmentMergeLineage segmentMergeLineage = null;
+ if (znRecord != null) {
+ segmentMergeLineage = SegmentMergeLineage.fromZNRecord(znRecord);
+ }
+ return segmentMergeLineage;
+ }
+
+ /**
+ * Write the segment merge lineage to the property store
+ *
+ * @param propertyStore a property store
+ * @param segmentMergeLineage a segment merge lineage
+ * @return true if update is successful. false otherwise.
+ */
+ public static boolean
writeSegmentMergeLineage(ZkHelixPropertyStore<ZNRecord> propertyStore,
+ SegmentMergeLineage segmentMergeLineage, int expectedVersion) {
+ String tableNameWithType = segmentMergeLineage.getTableName();
+ String path =
ZKMetadataProvider.constructPropertyStorePathForSegmentMergeLineage(tableNameWithType);
+ return propertyStore.set(path, segmentMergeLineage.toZNRecord(),
expectedVersion, AccessOption.PERSISTENT);
+ }
+}
diff --git
a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
index fe55a0c..e9d6b63 100644
---
a/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/com/linkedin/pinot/common/metadata/ZKMetadataProvider.java
@@ -53,6 +53,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX =
"/CONFIGS/TABLE";
private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX =
"/CONFIGS/INSTANCE";
private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX =
"/CONFIGS/CLUSTER";
+ private static final String PROPERTYSTORE_SEGMENT_MERGE_LINEAGE =
"/SEGMENT_MERGE_LINEAGE";
public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String realtimeTableName,
ZNRecord znRecord) {
@@ -106,6 +107,10 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX,
controllerConfigKey);
}
+ public static String constructPropertyStorePathForSegmentMergeLineage(String
tableNameWithType) {
+ return StringUtil.join("/", PROPERTYSTORE_SEGMENT_MERGE_LINEAGE,
tableNameWithType);
+ }
+
public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord>
propertyStore, String resourceNameForResource,
String segmentName) {
return
propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource,
segmentName),
diff --git
a/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
new file mode 100644
index 0000000..1081474
--- /dev/null
+++
b/pinot-common/src/test/java/com/linkedin/pinot/common/lineage/SegmentMergeLineageTest.java
@@ -0,0 +1,142 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.lineage;
+
+import com.linkedin.pinot.common.exception.InvalidConfigException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class SegmentMergeLineageTest {
+
+ @Test
+ public void testSegmentMergeLineage() throws Exception {
+ SegmentMergeLineage segmentMergeLineage = new
SegmentMergeLineage("test_OFFLINE");
+
+ // Start with 3 original segments.
+ String groupId1 = "G1";
+ List<String> groupSegments1 = Arrays.asList(new String[]{"segment1"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1),
groupSegments1);
+
+ String groupId2 = "G2";
+ List<String> groupSegments2 = Arrays.asList(new String[]{"segment2"});
+ segmentMergeLineage.addSegmentGroup(groupId2, groupSegments2, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId2),
groupSegments2);
+
+ String groupId3 = "G3";
+ List<String> groupSegments3 = Arrays.asList(new String[]{"segment3"});
+ segmentMergeLineage.addSegmentGroup(groupId3, groupSegments3, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId3),
groupSegments3);
+
+ // Simulating when a merge task merges segment 1,2 into segment 4.
+ String groupId4 = "G4";
+ List<String> groupSegments4 = Arrays.asList(new String[]{"segment4"});
+ segmentMergeLineage.addSegmentGroup(groupId4, groupSegments4,
Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId4),
groupSegments4);
+ Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId4),
+ Arrays.asList(new String[]{groupId1, groupId2}));
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId4),
+ Arrays.asList(new String[]{"segment4"}));
+
+ // 2 more original segments gets uploaded.
+ String groupId5 = "G5";
+ List<String> groupSegments5 = Arrays.asList(new String[] {"segment 5"});
+ segmentMergeLineage.addSegmentGroup(groupId5, groupSegments5, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId5),
groupSegments5);
+
+ String groupId6 = "G6";
+ List<String> groupSegments6 = Arrays.asList(new String[] {"segment 6"});
+ segmentMergeLineage.addSegmentGroup(groupId6, groupSegments6, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId6),
groupSegments6);
+
+ // Let's say a merge task merges segment 3,5,6 into 7,8.
+ String groupId7 = "G7";
+ List<String> groupSegments7= Arrays.asList(new String[]{"segment7",
"segment8"});
+ segmentMergeLineage.addSegmentGroup(groupId7, groupSegments7,
Arrays.asList(new String[]{groupId3, groupId5, groupId6}));
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId7),
groupSegments7);
+ Assert.assertEquals(segmentMergeLineage.getChildrenForGroup(groupId7),
+ Arrays.asList(new String[]{groupId3, groupId5, groupId6}));
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId7),
+ Arrays.asList(new String[]{"segment7", "segment8"}));
+
+ // Check available APIs
+ Assert.assertEquals(segmentMergeLineage.getTableName(), "test_OFFLINE");
+ Assert.assertEquals(segmentMergeLineage.getAllGroupLevels(),
Arrays.asList(new Integer[]{0, 1}));
+
Assert.assertTrue(segmentMergeLineage.equals(SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord())));
+ Assert.assertEquals(segmentMergeLineage.getGroupIdsForGroupLevel(0),
+ Arrays.asList(new String[]{groupId1, groupId2, groupId3, groupId5,
groupId6}));
+ Assert.assertEquals(new
HashSet<>(segmentMergeLineage.getGroupIdsForGroupLevel(1)),
+ new HashSet<>(Arrays.asList(new String[]{groupId4, groupId7})));
+ validateSegmentGroup(segmentMergeLineage);
+
+ // Check ZNRecord conversion
+ Assert.assertEquals(segmentMergeLineage,
SegmentMergeLineage.fromZNRecord(segmentMergeLineage.toZNRecord()));
+
+ // Test removing groups
+ segmentMergeLineage.removeSegmentGroup(groupId1);
+ Assert.assertNull(segmentMergeLineage.getChildrenForGroup(groupId1));
+ Assert.assertNull(segmentMergeLineage.getSegmentsForGroup(groupId1));
+
Assert.assertFalse(segmentMergeLineage.getGroupIdsForGroupLevel(0).contains(groupId1));
+ }
+
+ @Test
+ public void testUpdateWithDuplicateGroupId() throws Exception {
+ SegmentMergeLineage segmentMergeLineage = new
SegmentMergeLineage("test_OFFLINE");
+ String groupId1 = "G1";
+ List<String> groupSegments1 = Arrays.asList(new String[]{"segment1,
segment2, segment3"});
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments1, null);
+ Assert.assertEquals(segmentMergeLineage.getSegmentsForGroup(groupId1),
groupSegments1);
+
+ List<String> groupSegments2 = Arrays.asList(new String[]{"segment4,
segment5, segment6"});
+ try {
+ segmentMergeLineage.addSegmentGroup(groupId1, groupSegments2, null);
+ Assert.fail();
+ } catch (InvalidConfigException e) {
+ // expected
+ }
+ }
+
+ private void validateSegmentGroup(SegmentMergeLineage segmentMergeLineage) {
+ SegmentGroup segmentGroup =
segmentMergeLineage.getMergeLineageRootSegmentGroup();
+ for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+ validateSegmentGroupNode(child, segmentMergeLineage);
+ }
+ }
+
+ private void validateSegmentGroupNode(SegmentGroup segmentGroup,
SegmentMergeLineage segmentMergeLineage) {
+ String groupId = segmentGroup.getGroupId();
+ Assert.assertEquals(segmentGroup.getSegments(), new
HashSet<>(segmentMergeLineage.getSegmentsForGroup(groupId)));
+
Assert.assertTrue(segmentMergeLineage.getGroupIdsForGroupLevel(segmentGroup.getGroupLevel()).contains(groupId));
+
+ List<SegmentGroup> childrenGroups = segmentGroup.getChildrenGroups();
+ if (childrenGroups != null) {
+ List<String> childrenGroupIds = new ArrayList<>();
+ for (SegmentGroup child : childrenGroups) {
+ childrenGroupIds.add(child.getGroupId());
+ }
+ Assert.assertEquals(childrenGroupIds,
segmentMergeLineage.getChildrenForGroup(groupId));
+
+ for (SegmentGroup child : segmentGroup.getChildrenGroups()) {
+ validateSegmentGroupNode(child, segmentMergeLineage);
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]