sv2000 commented on a change in pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#discussion_r492218836
##########
File path:
gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
##########
@@ -411,7 +415,7 @@ public void handleMessage(ControlMessage message) {
/**
* Get the serialized key to partitions info in {@link #state}
*/
- private static String getPartitionsKey(int branchId) {
+ public static String getPartitionsKey(int branchId) {
Review comment:
This is needed in LinkedIn's internal publisher. Hence the change.
##########
File path:
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
System.arraycopy(objects, 0, args, 1, objects.length);
return LineageEventBuilder.getKey(args);
}
+
+ private static String getDestinationKey(int branchId) {
+ return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+ }
+ /**
+ * Remove the destination property from the state object. Used in streaming
mode, where we want to selectively purge
+ * lineage information from the state.
+ * @param state
+ * @param branchId
+ */
+ public static void removeDestinationProp(State state, int branchId) {
Review comment:
same as above.
##########
File path:
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
System.arraycopy(objects, 0, args, 1, objects.length);
return LineageEventBuilder.getKey(args);
}
+
+ private static String getDestinationKey(int branchId) {
+ return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+ }
+ /**
+ * Remove the destination property from the state object. Used in streaming
mode, where we want to selectively purge
+ * lineage information from the state.
+ * @param state
+ * @param branchId
+ */
+ public static void removeDestinationProp(State state, int branchId) {
+ String destinationKey = getDestinationKey(branchId);
+ if (state.contains(destinationKey)) {
+ state.removeProp(destinationKey);
+ }
+ }
+
+ /**
+ * Group states by lineage event name (i.e the dataset name). Used for
de-duping LineageEvents for a given dataset.
+ * @param states
+ * @return a map of {@link WorkUnitState}s keyed by dataset name.
+ */
+ public static Map<String, Collection<WorkUnitState>>
aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {
Review comment:
Same comment as above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]