rohitsinha54 commented on code in PR #33891:
URL: https://github.com/apache/beam/pull/33891#discussion_r1992818823


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java:
##########
@@ -50,111 +55,130 @@ public static Lineage getSinks() {
     return SINKS;
   }
 
-  /**
-   * Wrap segment to valid segment name.
-   *
-   * <p>Specifically, If there are reserved chars (colon, whitespace, dot), 
escape with backtick. If
-   * the segment is already wrapped, return the original.
-   *
-   * <p>This helper method is for internal and testing usage only.
-   */
-  @Internal
-  public static String wrapSegment(String value) {
-    if (value.startsWith("`") && value.endsWith("`")) {
-      return value;
-    }
-    if (RESERVED_CHARS.matcher(value).find()) {
-      return String.format("`%s`", value);
-    }
-    return value;
-  }
-
-  /**
-   * Assemble fully qualified name (<a
-   * 
href="https://cloud.google.com/data-catalog/docs/fully-qualified-names";>FQN</a>).
 Format:
-   *
-   * <ul>
-   *   <li>{@code system:segment1.segment2}
-   *   <li>{@code system:subtype:segment1.segment2}
-   *   <li>{@code system:`segment1.with.dots:clons`.segment2}
-   * </ul>
-   *
-   * <p>This helper method is for internal and testing usage only.
-   */
-  @Internal
-  public static String getFqName(
-      String system, @Nullable String subtype, Iterable<String> segments) {
-    StringBuilder builder = new StringBuilder(system);
-    if (!Strings.isNullOrEmpty(subtype)) {
-      builder.append(":").append(subtype);
+  @VisibleForTesting
+  static Iterator<String> getFQNParts(
+      String system,
+      @Nullable String subtype,
+      List<String> segments,
+      @Nullable String lastSegmentSep) {
+
+    List<String> parts = new ArrayList<>();
+    parts.add(system + ":");
+    if (subtype != null) {
+      parts.add(subtype + ":");
     }
-    int idx = 0;
-    for (String segment : segments) {
-      if (idx == 0) {
-        builder.append(":");
+    if (segments != null && segments.size() > 0) {
+      for (int i = 0; i < segments.size() - 1; i++) {
+        parts.add(wrapSegment(segments.get(i)) + ".");
+      }
+      if (lastSegmentSep != null) {
+        List<String> subSegments =
+            Splitter.onPattern(lastSegmentSep)
+                .splitToList(wrapSegment(segments.get(segments.size() - 1)));
+        for (int i = 0; i < subSegments.size() - 1; i++) {
+          parts.add(subSegments.get(i) + lastSegmentSep);
+        }
+        parts.add(subSegments.get(subSegments.size() - 1));
       } else {
-        builder.append(".");
+        parts.add(segments.get(segments.size() - 1));
       }
-      builder.append(wrapSegment(segment));
-      ++idx;
     }
-    return builder.toString();
+    return parts.iterator();
   }
 
   /**
-   * Assemble the FQN of given system, and segments.
-   *
-   * <p>This helper method is for internal and testing usage only.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
-  @Internal
-  public static String getFqName(String system, Iterable<String> segments) {
-    return getFqName(system, null, segments);
+  public void add(
+      String system,
+      @Nullable String subtype,
+      Iterable<String> segments,
+      @Nullable String lastSegmentSep) {
+    List<String> result = new ArrayList<String>();
+    segments.forEach(result::add);
+
+    add(getFQNParts(system, subtype, result, lastSegmentSep));
   }
 
   /**
-   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
-  public void add(String system, @Nullable String subtype, Iterable<String> 
segments) {
-    add(getFqName(system, subtype, segments));
+  public void add(String system, Iterable<String> segments, @Nullable String 
lastSegmentSep) {
+    add(system, null, segments, lastSegmentSep);
   }
 
   /**
-   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
   public void add(String system, Iterable<String> segments) {
-    add(system, null, segments);
+    add(system, segments, null);
   }
 
   /**
-   * Adds the given details as Lineage. For asset level lineage the resource 
location should be
-   * specified as Dataplex FQN 
https://cloud.google.com/data-catalog/docs/fully-qualified-names
+   * Adds the given fqn as lineage.
+   *
+   * @param rollupSegments should be an iterable of strings whose 
concatenation is a valid <a
+   *     
href="https://cloud.google.com/data-catalog/docs/fully-qualified-names";>Dataplex
 FQN </a>
+   *     which is already escaped.
+   *     <p>In particular, this means they will often have trailing delimiters.
    */
-  public void add(String details) {
-    metric.add(details);
+  public void add(Iterator<String> rollupSegments) {
+    List<String> segments = new ArrayList<>();
+    rollupSegments.forEachRemaining(segments::add);
+    this.metric.add(segments);

Review Comment:
   Done



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java:
##########
@@ -50,111 +55,130 @@ public static Lineage getSinks() {
     return SINKS;
   }
 
-  /**
-   * Wrap segment to valid segment name.
-   *
-   * <p>Specifically, If there are reserved chars (colon, whitespace, dot), 
escape with backtick. If
-   * the segment is already wrapped, return the original.
-   *
-   * <p>This helper method is for internal and testing usage only.
-   */
-  @Internal
-  public static String wrapSegment(String value) {
-    if (value.startsWith("`") && value.endsWith("`")) {
-      return value;
-    }
-    if (RESERVED_CHARS.matcher(value).find()) {
-      return String.format("`%s`", value);
-    }
-    return value;
-  }
-
-  /**
-   * Assemble fully qualified name (<a
-   * 
href="https://cloud.google.com/data-catalog/docs/fully-qualified-names";>FQN</a>).
 Format:
-   *
-   * <ul>
-   *   <li>{@code system:segment1.segment2}
-   *   <li>{@code system:subtype:segment1.segment2}
-   *   <li>{@code system:`segment1.with.dots:clons`.segment2}
-   * </ul>
-   *
-   * <p>This helper method is for internal and testing usage only.
-   */
-  @Internal
-  public static String getFqName(
-      String system, @Nullable String subtype, Iterable<String> segments) {
-    StringBuilder builder = new StringBuilder(system);
-    if (!Strings.isNullOrEmpty(subtype)) {
-      builder.append(":").append(subtype);
+  @VisibleForTesting
+  static Iterator<String> getFQNParts(
+      String system,
+      @Nullable String subtype,
+      List<String> segments,
+      @Nullable String lastSegmentSep) {
+
+    List<String> parts = new ArrayList<>();
+    parts.add(system + ":");
+    if (subtype != null) {
+      parts.add(subtype + ":");
     }
-    int idx = 0;
-    for (String segment : segments) {
-      if (idx == 0) {
-        builder.append(":");
+    if (segments != null && segments.size() > 0) {
+      for (int i = 0; i < segments.size() - 1; i++) {
+        parts.add(wrapSegment(segments.get(i)) + ".");
+      }
+      if (lastSegmentSep != null) {
+        List<String> subSegments =
+            Splitter.onPattern(lastSegmentSep)
+                .splitToList(wrapSegment(segments.get(segments.size() - 1)));
+        for (int i = 0; i < subSegments.size() - 1; i++) {
+          parts.add(subSegments.get(i) + lastSegmentSep);
+        }
+        parts.add(subSegments.get(subSegments.size() - 1));
       } else {
-        builder.append(".");
+        parts.add(segments.get(segments.size() - 1));
       }
-      builder.append(wrapSegment(segment));
-      ++idx;
     }
-    return builder.toString();
+    return parts.iterator();
   }
 
   /**
-   * Assemble the FQN of given system, and segments.
-   *
-   * <p>This helper method is for internal and testing usage only.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
-  @Internal
-  public static String getFqName(String system, Iterable<String> segments) {
-    return getFqName(system, null, segments);
+  public void add(
+      String system,
+      @Nullable String subtype,
+      Iterable<String> segments,
+      @Nullable String lastSegmentSep) {
+    List<String> result = new ArrayList<String>();
+    segments.forEach(result::add);
+
+    add(getFQNParts(system, subtype, result, lastSegmentSep));
   }
 
   /**
-   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
-  public void add(String system, @Nullable String subtype, Iterable<String> 
segments) {
-    add(getFqName(system, subtype, segments));
+  public void add(String system, Iterable<String> segments, @Nullable String 
lastSegmentSep) {
+    add(system, null, segments, lastSegmentSep);
   }
 
   /**
-   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
   public void add(String system, Iterable<String> segments) {
-    add(system, null, segments);
+    add(system, segments, null);
   }
 
   /**
-   * Adds the given details as Lineage. For asset level lineage the resource 
location should be
-   * specified as Dataplex FQN 
https://cloud.google.com/data-catalog/docs/fully-qualified-names
+   * Adds the given fqn as lineage.
+   *
+   * @param rollupSegments should be an iterable of strings whose 
concatenation is a valid <a
+   *     
href="https://cloud.google.com/data-catalog/docs/fully-qualified-names";>Dataplex
 FQN </a>
+   *     which is already escaped.
+   *     <p>In particular, this means they will often have trailing delimiters.
    */
-  public void add(String details) {
-    metric.add(details);
+  public void add(Iterator<String> rollupSegments) {
+    List<String> segments = new ArrayList<>();
+    rollupSegments.forEachRemaining(segments::add);
+    this.metric.add(segments);
   }
 
-  /** Query {@link StringSet} metrics from {@link MetricResults}. */
-  public static Set<String> query(MetricResults results, Type type) {
+  /**
+   * Query {@link BoundedTrie} metrics from {@link MetricResults}.
+   *
+   * @param results FQNs from the result.
+   * @param type sources or sinks.
+   * @param truncatedMarker the marker to use to represent truncated FQNs.
+   * @return A flat representation of all FQNs. If the FQN was truncated then 
it has a trailing
+   *     truncatedMarker.
+   */
+  public static Set<String> query(MetricResults results, Type type, String 
truncatedMarker) {
     MetricsFilter filter =
         MetricsFilter.builder()
             .addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, 
type.toString()))
             .build();
     Set<String> result = new HashSet<>();
-    for (MetricResult<StringSetResult> metrics : 
results.queryMetrics(filter).getStringSets()) {
+    truncatedMarker = truncatedMarker == null ? "*" : truncatedMarker;
+    for (MetricResult<BoundedTrieResult> metrics : 
results.queryMetrics(filter).getBoundedTries()) {
       try {
-        result.addAll(metrics.getCommitted().getStringSet());
+        for (List<String> fqn : metrics.getCommitted().getResult()) {
+          String end = Boolean.parseBoolean(fqn.get(fqn.size() - 1)) ? 
truncatedMarker : "";

Review Comment:
   Good idea. We can update it to do so. 
   I will like to do it in a follow up PR after this one as it is an internal 
clean up and have no customer visible impact due to this once I finish the 
service side dfe plumbing for this. If that's ok with you. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to