rohitsinha54 commented on code in PR #33891:
URL: https://github.com/apache/beam/pull/33891#discussion_r1992817605


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java:
##########
@@ -50,111 +55,130 @@ public static Lineage getSinks() {
     return SINKS;
   }
 
-  /**
-   * Wrap segment to valid segment name.
-   *
-   * <p>Specifically, If there are reserved chars (colon, whitespace, dot), 
escape with backtick. If
-   * the segment is already wrapped, return the original.
-   *
-   * <p>This helper method is for internal and testing usage only.
-   */
-  @Internal
-  public static String wrapSegment(String value) {
-    if (value.startsWith("`") && value.endsWith("`")) {
-      return value;
-    }
-    if (RESERVED_CHARS.matcher(value).find()) {
-      return String.format("`%s`", value);
-    }
-    return value;
-  }
-
-  /**
-   * Assemble fully qualified name (<a
-   * 
href="https://cloud.google.com/data-catalog/docs/fully-qualified-names";>FQN</a>).
 Format:
-   *
-   * <ul>
-   *   <li>{@code system:segment1.segment2}
-   *   <li>{@code system:subtype:segment1.segment2}
-   *   <li>{@code system:`segment1.with.dots:clons`.segment2}
-   * </ul>
-   *
-   * <p>This helper method is for internal and testing usage only.
-   */
-  @Internal
-  public static String getFqName(
-      String system, @Nullable String subtype, Iterable<String> segments) {
-    StringBuilder builder = new StringBuilder(system);
-    if (!Strings.isNullOrEmpty(subtype)) {
-      builder.append(":").append(subtype);
+  @VisibleForTesting
+  static Iterator<String> getFQNParts(
+      String system,
+      @Nullable String subtype,
+      List<String> segments,
+      @Nullable String lastSegmentSep) {
+
+    List<String> parts = new ArrayList<>();
+    parts.add(system + ":");
+    if (subtype != null) {
+      parts.add(subtype + ":");
     }
-    int idx = 0;
-    for (String segment : segments) {
-      if (idx == 0) {
-        builder.append(":");
+    if (segments != null && segments.size() > 0) {
+      for (int i = 0; i < segments.size() - 1; i++) {
+        parts.add(wrapSegment(segments.get(i)) + ".");
+      }
+      if (lastSegmentSep != null) {
+        List<String> subSegments =
+            Splitter.onPattern(lastSegmentSep)
+                .splitToList(wrapSegment(segments.get(segments.size() - 1)));
+        for (int i = 0; i < subSegments.size() - 1; i++) {
+          parts.add(subSegments.get(i) + lastSegmentSep);
+        }
+        parts.add(subSegments.get(subSegments.size() - 1));
       } else {
-        builder.append(".");
+        parts.add(segments.get(segments.size() - 1));
       }
-      builder.append(wrapSegment(segment));
-      ++idx;
     }
-    return builder.toString();
+    return parts.iterator();
   }
 
   /**
-   * Assemble the FQN of given system, and segments.
-   *
-   * <p>This helper method is for internal and testing usage only.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
-  @Internal
-  public static String getFqName(String system, Iterable<String> segments) {
-    return getFqName(system, null, segments);
+  public void add(
+      String system,
+      @Nullable String subtype,
+      Iterable<String> segments,
+      @Nullable String lastSegmentSep) {
+    List<String> result = new ArrayList<String>();
+    segments.forEach(result::add);
+
+    add(getFQNParts(system, subtype, result, lastSegmentSep));
   }
 
   /**
-   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
-  public void add(String system, @Nullable String subtype, Iterable<String> 
segments) {
-    add(getFqName(system, subtype, segments));
+  public void add(String system, Iterable<String> segments, @Nullable String 
lastSegmentSep) {
+    add(system, null, segments, lastSegmentSep);
   }
 
   /**
-   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link
+   * #getFQNParts}.
    */
   public void add(String system, Iterable<String> segments) {
-    add(system, null, segments);
+    add(system, segments, null);
   }
 
   /**
-   * Adds the given details as Lineage. For asset level lineage the resource 
location should be
-   * specified as Dataplex FQN 
https://cloud.google.com/data-catalog/docs/fully-qualified-names
+   * Adds the given fqn as lineage.
+   *
+   * @param rollupSegments should be an iterable of strings whose 
concatenation is a valid <a

Review Comment:
   Good catch. Thanks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to