[
https://issues.apache.org/jira/browse/GOBBLIN-1266?focusedWorklogId=487070&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-487070
]
ASF GitHub Bot logged work on GOBBLIN-1266:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Sep/20 16:45
Start Date: 21/Sep/20 16:45
Worklog Time Spent: 10m
Work Description: autumnust commented on a change in pull request #3107:
URL: https://github.com/apache/incubator-gobblin/pull/3107#discussion_r492198456
##########
File path:
gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
##########
@@ -411,7 +415,7 @@ public void handleMessage(ControlMessage message) {
/**
* Get the serialized key to partitions info in {@link #state}
*/
- private static String getPartitionsKey(int branchId) {
+ public static String getPartitionsKey(int branchId) {
Review comment:
why changing to public ?
##########
File path:
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
System.arraycopy(objects, 0, args, 1, objects.length);
return LineageEventBuilder.getKey(args);
}
+
+ private static String getDestinationKey(int branchId) {
+ return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+ }
+ /**
+ * Remove the destination property from the state object. Used in streaming
mode, where we want to selectively purge
+ * lineage information from the state.
+ * @param state
+ * @param branchId
+ */
+ public static void removeDestinationProp(State state, int branchId) {
Review comment:
Where is this being used?
##########
File path:
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
##########
@@ -303,4 +306,50 @@ private static String getKey(Object... objects) {
System.arraycopy(objects, 0, args, 1, objects.length);
return LineageEventBuilder.getKey(args);
}
+
+ private static String getDestinationKey(int branchId) {
+ return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+ }
+ /**
+ * Remove the destination property from the state object. Used in streaming
mode, where we want to selectively purge
+ * lineage information from the state.
+ * @param state
+ * @param branchId
+ */
+ public static void removeDestinationProp(State state, int branchId) {
+ String destinationKey = getDestinationKey(branchId);
+ if (state.contains(destinationKey)) {
+ state.removeProp(destinationKey);
+ }
+ }
+
+ /**
+ * Group states by lineage event name (i.e the dataset name). Used for
de-duping LineageEvents for a given dataset.
+ * @param states
+ * @return a map of {@link WorkUnitState}s keyed by dataset name.
+ */
+ public static Map<String, Collection<WorkUnitState>>
aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {
Review comment:
Is the signature change necessary ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 487070)
Time Spent: 0.5h (was: 20m)
> Refactor dataset lineage code to allow Lineage event emission in streaming
> mode
> --------------------------------------------------------------------------------
>
> Key: GOBBLIN-1266
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1266
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-core
> Affects Versions: 0.15.0
> Reporter: Sudarshan Vasudevan
> Assignee: Abhishek Tiwari
> Priority: Major
> Fix For: 0.16.0
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Dataset Lineage emission in current Gobblin code is only available for batch
> mode of execution. For instance, the lineage events are emitted on dataset
> commit, which is never invoked for long-running tasks. This PR refactors
> lineage related code so that it is leverageable in streaming mode of
> execution as well.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)