rohitsinha54 commented on code in PR #32090: URL: https://github.com/apache/beam/pull/32090#discussion_r1715833797
########## sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java: ########## @@ -19,26 +19,110 @@ import java.util.HashSet; import java.util.Set; +import java.util.regex.Pattern; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Standard collection of metrics used to record source and sinks information for lineage tracking. */ public class Lineage { + public static final String LINEAGE_NAMESPACE = "lineage"; - private static final StringSet SOURCES = - Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString()); - private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, Type.SINK.toString()); + private static final Lineage SOURCES = new Lineage(Type.SOURCE); + private static final Lineage SINKS = new Lineage(Type.SINK); + + private final StringSet metric; - /** {@link StringSet} representing sources and optionally side inputs. */ - public static StringSet getSources() { + private Lineage(Type type) { + this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString()); + } + + /** {@link Lineage} representing sources and optionally side inputs. */ + public static Lineage getSources() { return SOURCES; } - /** {@link StringSet} representing sinks. */ - public static StringSet getSinks() { + /** {@link Lineage} representing sinks. */ + public static Lineage getSinks() { return SINKS; } + private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]"); Review Comment: nit: align up with other private static final ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java: ########## @@ -109,12 +108,12 @@ public List<BigQueryStorageStreamSource<T>> split( @Nullable Table targetTable = getTargetTable(bqOptions); ReadSession.Builder readSessionBuilder = ReadSession.newBuilder(); - StringSet lineageSources = Lineage.getSources(); + Lineage lineage = Lineage.getSources(); if (targetTable != null) { TableReference tableReference = targetTable.getTableReference(); readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference)); // register the table as lineage source - lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, bqOptions)); + lineage.add("bigquery", BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions)); Review Comment: nice this change will help in future too with Open Lineage integration where the system is namespace and remaining part of fqn is namehttps://screenshot.googleplex.com/Aj9hUyPnVtUTy7h ########## sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java: ########## @@ -624,6 +625,11 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir return S3ResourceId.fromUri(singleResourceSpec); } + @Override + protected void reportLineage(S3ResourceId resourceId, Lineage lineage) { + lineage.add("s3", ImmutableList.of(resourceId.getBucket(), resourceId.getKey())); Review Comment: is the relative path not possible here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org