rohitsinha54 commented on code in PR #32090:
URL: https://github.com/apache/beam/pull/32090#discussion_r1715833797


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java:
##########
@@ -19,26 +19,110 @@
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Standard collection of metrics used to record source and sinks information 
for lineage tracking.
  */
 public class Lineage {
+
   public static final String LINEAGE_NAMESPACE = "lineage";
-  private static final StringSet SOURCES =
-      Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
-  private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, 
Type.SINK.toString());
+  private static final Lineage SOURCES = new Lineage(Type.SOURCE);
+  private static final Lineage SINKS = new Lineage(Type.SINK);
+
+  private final StringSet metric;
 
-  /** {@link StringSet} representing sources and optionally side inputs. */
-  public static StringSet getSources() {
+  private Lineage(Type type) {
+    this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
+  }
+
+  /** {@link Lineage} representing sources and optionally side inputs. */
+  public static Lineage getSources() {
     return SOURCES;
   }
 
-  /** {@link StringSet} representing sinks. */
-  public static StringSet getSinks() {
+  /** {@link Lineage} representing sinks. */
+  public static Lineage getSinks() {
     return SINKS;
   }
 
+  private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]");

Review Comment:
   nit: align up with other private static final



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -109,12 +108,12 @@ public List<BigQueryStorageStreamSource<T>> split(
     @Nullable Table targetTable = getTargetTable(bqOptions);
 
     ReadSession.Builder readSessionBuilder = ReadSession.newBuilder();
-    StringSet lineageSources = Lineage.getSources();
+    Lineage lineage = Lineage.getSources();
     if (targetTable != null) {
       TableReference tableReference = targetTable.getTableReference();
       
readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference));
       // register the table as lineage source
-      lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, 
bqOptions));
+      lineage.add("bigquery", 
BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions));

Review Comment:
   nice this change will help in future too with Open Lineage integration where 
the system is namespace and remaining part of fqn is 
namehttps://screenshot.googleplex.com/Aj9hUyPnVtUTy7h



##########
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java:
##########
@@ -624,6 +625,11 @@ protected S3ResourceId matchNewResource(String 
singleResourceSpec, boolean isDir
     return S3ResourceId.fromUri(singleResourceSpec);
   }
 
+  @Override
+  protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
+    lineage.add("s3", ImmutableList.of(resourceId.getBucket(), 
resourceId.getKey()));

Review Comment:
   is the relative path not possible here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to