This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cf67652b6d [Real-time Table Replication X clusters][1/n] Creating new 
tables with designated consuming segments (#17235)
4cf67652b6d is described below

commit 4cf67652b6d30864c7e475c27c741ec8a85b5791
Author: Xuanyi Li <[email protected]>
AuthorDate: Thu Jan 15 14:56:14 2026 -0800

    [Real-time Table Replication X clusters][1/n] Creating new tables with 
designated consuming segments (#17235)
    
    * [Pinot Copy Table] induct the watermark of consuming status
    
    * [Pinot Copy Table] create first consuming segments per partition
    
    * [Pinot Copy Table] copy table with designated tenant and watermarks
    
    Reviewers: #ldap_rta-group, ureview, muller.liu
    
    Reviewed By: #ldap_rta-group, muller.liu
    
    Tags: #has_java
    
    JIRA Issues: EVA-12009
    
    Differential Revision: https://code.uberinternal.com/D20152399
    
    * Add licenses
    
    * rename watermark endpoint; private copyTablePayload
    
    * fix the unit test
    
    * Add logs for copying consuming segments
    
    * add more logs
    
    * add http:// to the uri
    
    * instanceAssignmentConfig tag compatible with lower jackson version
    
    * make test tenant name more generic
    
    * enforce user input http:// or https://
    
    * null check or comment why skip null check for mandantory field
    
    * make private field for watermarkResults
    
    * offline, upsert not supported
    
    * cosmetic change with more comments
    
    * postpone schema validation
    
    * check table existance at the beginning
    
    * add consumerWatermarks url builder
    
    * add schema and realtime table config into the response
    
    * introduce dryrun mode
    
    * add license
    
    * mvn spotless:apply
    
    * remove redundant log
    
    * move knobs to path parameter
    
    * Fix JavaDoc inconsistencies for consumer watermarks API
    
    - Updated PinotRealtimeTableResource.java: Changed misleading JavaDoc from
      "Get table ideal state" to accurately describe the consumer watermarks 
endpoint
    - Updated PinotHelixResourceManager.java: Added explicit documentation 
clarifying
      that the API is restricted to realtime tables only and works with both 
upsert
      and non-upsert tables
    
    * Replace PartitionGroupInfo with Pair<PartitionGroupMetadata, Integer>
    
    - Updated PinotHelixResourceManager.addTable() to accept 
List<Pair<PartitionGroupMetadata, Integer>>
    - Updated PinotLLCRealtimeSegmentManager.setUpNewTable() to use Pair 
instead of PartitionGroupInfo
    - Refactored setupNewPartitionGroup() to accept metadata and sequence 
separately
    - Updated PinotTableRestletResource copyTable endpoint to create Pair 
instances
    - Updated unit tests in PinotHelixResourceManagerStatelessTest and 
PartitionGroupInfoTest
    - Added necessary imports for PartitionGroupMetadata and Pair
    
    * Remove unused PartitionGroupInfo class and test
    
    - Deleted PartitionGroupInfo.java as it has been replaced with 
Pair<PartitionGroupMetadata, Integer>
    - Deleted PartitionGroupInfoTest.java as the class is no longer needed
    - Verified no other classes reference PartitionGroupInfo
    
    * javadoc
---
 .../controller/api/resources/CopyTablePayload.java |  88 +++++++++++++
 .../api/resources/CopyTableResponse.java           |  99 ++++++++++++++
 .../api/resources/PinotRealtimeTableResource.java  |  22 ++++
 .../api/resources/PinotTableRestletResource.java   | 142 +++++++++++++++++++++
 .../helix/core/PinotHelixResourceManager.java      |  79 +++++++++++-
 .../helix/core/WatermarkInductionResult.java       | 111 ++++++++++++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  42 ++++--
 .../resources/PinotTableRestletResourceTest.java   |  49 +++++++
 .../PinotHelixResourceManagerStatelessTest.java    | 119 +++++++++++++++++
 .../table_config_with_instance_assignment.json     |  15 +++
 .../utils/builder/ControllerRequestURLBuilder.java |   4 +
 .../builder/ControllerRequestURLBuilderTest.java   |  34 +++++
 12 files changed, 793 insertions(+), 11 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java
new file mode 100644
index 00000000000..8583ff36a3c
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  private String _sourceClusterUri;
+  private Map<String, String> _headers;
+  /**
+   * Broker tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+   */
+  private String _brokerTenant;
+  /**
+   * Server tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+   */
+  private String _serverTenant;
+
+  /**
+   * The instanceAssignmentConfig's tagPoolConfig contains full tenant name. 
We will use this field to let user specify
+   * the replacement relation from source cluster's full tenant to target 
cluster's full tenant.
+   */
+  private Map<String, String> _tagPoolReplacementMap;
+
+  @JsonCreator
+  public CopyTablePayload(
+      @JsonProperty(value = "sourceClusterUri", required = true) String 
sourceClusterUri,
+      @JsonProperty("sourceClusterHeaders") Map<String, String> headers,
+      @JsonProperty(value = "brokerTenant", required = true) String 
brokerTenant,
+      @JsonProperty(value = "serverTenant", required = true) String 
serverTenant,
+      @JsonProperty("tagPoolReplacementMap") @Nullable Map<String, String> 
tagPoolReplacementMap) {
+    _sourceClusterUri = sourceClusterUri;
+    _headers = headers;
+    _brokerTenant = brokerTenant;
+    _serverTenant = serverTenant;
+    _tagPoolReplacementMap = tagPoolReplacementMap;
+  }
+
+  @JsonGetter("sourceClusterUri")
+  public String getSourceClusterUri() {
+    return _sourceClusterUri;
+  }
+
+  @JsonGetter("sourceClusterHeaders")
+  public Map<String, String> getHeaders() {
+    return _headers;
+  }
+
+  @JsonGetter("brokerTenant")
+  public String getBrokerTenant() {
+    return _brokerTenant;
+  }
+
+  @JsonGetter("serverTenant")
+  public String getServerTenant() {
+    return _serverTenant;
+  }
+
+  @JsonGetter("tagPoolReplacementMap")
+  public Map<String, String> getTagPoolReplacementMap() {
+    return _tagPoolReplacementMap;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTableResponse.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTableResponse.java
new file mode 100644
index 00000000000..91b11d5a41d
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTableResponse.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CopyTableResponse {
+  @JsonProperty("msg")
+  private String _msg;
+
+  @JsonProperty("status")
+  private String _status;
+
+  @JsonProperty("schema")
+  private Schema _schema;
+
+  @JsonProperty("realtimeTableConfig")
+  private TableConfig _tableConfig;
+
+  @JsonProperty("watermarkInductionResult")
+  private WatermarkInductionResult _watermarkInductionResult;
+
+  @JsonCreator
+  public CopyTableResponse(@JsonProperty("status") String status, 
@JsonProperty("msg") String msg,
+      @JsonProperty("schema") @Nullable Schema schema,
+      @JsonProperty("realtimeTableConfig") @Nullable TableConfig tableConfig,
+      @JsonProperty("watermarkInductionResult") @Nullable 
WatermarkInductionResult watermarkInductionResult) {
+    _status = status;
+    _msg = msg;
+    _schema = schema;
+    _tableConfig = tableConfig;
+    _watermarkInductionResult = watermarkInductionResult;
+  }
+
+  public String getMsg() {
+    return _msg;
+  }
+
+  public void setMsg(String msg) {
+    _msg = msg;
+  }
+
+  public String getStatus() {
+    return _status;
+  }
+
+  public void setStatus(String status) {
+    _status = status;
+  }
+
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  public void setSchema(Schema schema) {
+    _schema = schema;
+  }
+
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  public void setTableConfig(TableConfig tableConfig) {
+    _tableConfig = tableConfig;
+  }
+
+  public WatermarkInductionResult getWatermarkInductionResult() {
+    return _watermarkInductionResult;
+  }
+
+  public void setWatermarkInductionResult(
+      WatermarkInductionResult watermarkInductionResult) {
+    _watermarkInductionResult = watermarkInductionResult;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 2e7ad9b880a..00219cf7a34 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -53,10 +53,12 @@ import javax.ws.rs.core.Response;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
@@ -449,6 +451,26 @@ public class PinotRealtimeTableResource {
     }
   }
 
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/consumerWatermarks")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.GET_IDEAL_STATE)
+  @ApiOperation(value = "Get consumer watermarks for a realtime table",
+      notes = "Returns the next offset to be consumed for each partition 
group. Only works for realtime tables.")
+  public WatermarkInductionResult getConsumerWatermark(
+      @ApiParam(value = "Name of the realtime table", required = true) 
@PathParam("tableName") String tableName,
+      @Context HttpHeaders headers) {
+    try {
+      String table = DatabaseUtils.translateTableName(tableName, headers);
+      return _pinotHelixResourceManager.getConsumerWatermarks(table);
+    } catch (TableNotFoundException e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND, e);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   private Map<String, Set<String>> getInstanceToErrorSegmentsMap(String 
tableNameWithType) {
     ExternalView externalView = 
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
     Preconditions.checkState(externalView != null, "External view does not 
exist for table: " + tableNameWithType);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 2175394555b..b749fddc4b2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import io.swagger.annotations.Api;
@@ -36,6 +37,7 @@ import it.unimi.dsi.fastutil.Arrays;
 import it.unimi.dsi.fastutil.Swapper;
 import it.unimi.dsi.fastutil.ints.IntComparator;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -86,7 +88,9 @@ import 
org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.LogicalTableConfigUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.access.AccessType;
@@ -96,6 +100,7 @@ import 
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
 import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -121,9 +126,12 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.controller.ControllerJobType;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.Enablement;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.zookeeper.data.Stat;
@@ -283,6 +291,140 @@ public class PinotTableRestletResource {
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @ApiParam(value = "Include verbose information in response")
+      @QueryParam("verbose") @DefaultValue("false") boolean verbose,
+      @ApiParam(value = "Dry run mode") @QueryParam("dryRun") 
@DefaultValue("true") boolean dryRun,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+      if 
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
 != null
+          || 
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
 != null) {
+        throw new TableAlreadyExistsException("Table config for " + tableName
+            + " already exists. If this is unexpected, try deleting the table 
to remove all metadata associated"
+            + " with it before attempting to recreate.");
+      }
+
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+      URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+
+      URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      URI watermarkUri = new 
URI(urlBuilder.forConsumerWatermarksGet(tableName));
+      SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+      String watermarkJson = watermarkResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: {}", 
tableName, watermarkJson);
+      WatermarkInductionResult watermarkInductionResult =
+          JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      boolean hasRealtime = tableConfigNode.has(TableType.REALTIME.name());
+      if (hasOffline && !hasRealtime) {
+        throw new IllegalStateException("pure offline table copy not supported 
yet");
+      }
+
+      ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+      tweakRealtimeTableConfig(realtimeTableConfigNode, copyTablePayload);
+      TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+      if (realtimeTableConfig.getUpsertConfig() != null) {
+        throw new IllegalStateException("upsert table copy not supported");
+      }
+      LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}", tableName);
+
+      if (dryRun) {
+        return new CopyTableResponse("success", "Dry run", schema, 
realtimeTableConfig, watermarkInductionResult);
+      }
+
+      List<Pair<PartitionGroupMetadata, Integer>> partitionGroupInfos = 
watermarkInductionResult.getWatermarks()
+          .stream()
+          .map(watermark -> Pair.of(
+              new PartitionGroupMetadata(watermark.getPartitionGroupId(), new 
LongMsgOffset(watermark.getOffset())),
+              watermark.getSequenceNumber()))
+          .collect(Collectors.toList());
+
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}", 
tableName);
+      // Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
+      _pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+      LOGGER.info("[copyTable] Successfully added table config: {} with 
designated high watermark", tableName);
+      CopyTableResponse response = new CopyTableResponse("success", "Table 
copied successfully", null, null, null);
+      if (hasOffline) {
+        response = new CopyTableResponse("warn", "detect offline too; it will 
only copy real-time segments",
+            null, null, null);
+      }
+      if (verbose) {
+        response.setSchema(schema);
+        response.setTableConfig(realtimeTableConfig);
+        response.setWatermarkInductionResult(watermarkInductionResult);
+      }
+      return response;
+    } catch (Exception e) {
+      LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+      throw new ControllerApplicationException(LOGGER, "Error copying table: " 
+ e.getMessage(),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  /**
+   * Helper method to tweak the realtime table config. This method is used to 
set the broker and server tenants, and
+   * optionally replace the pool tags in the instance assignment config.
+   *
+   * @param realtimeTableConfigNode The JSON object representing the realtime 
table config.
+   * @param copyTablePayload The payload containing tenant and tag pool 
replacement information.
+   */
+  @VisibleForTesting
+  static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode, 
CopyTablePayload copyTablePayload) {
+    String brokerTenant = copyTablePayload.getBrokerTenant();
+    String serverTenant = copyTablePayload.getServerTenant();
+    Map<String, String> tagPoolReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+    ObjectNode tenantConfig = (ObjectNode) 
realtimeTableConfigNode.get("tenants");
+    tenantConfig.put("broker", brokerTenant);
+    tenantConfig.put("server", serverTenant);
+    if (tagPoolReplacementMap == null || tagPoolReplacementMap.isEmpty()) {
+      return;
+    }
+    JsonNode instanceAssignmentConfigMap = 
realtimeTableConfigNode.get("instanceAssignmentConfigMap");
+    if (instanceAssignmentConfigMap == null) {
+      return;
+    }
+    java.util.Iterator<Map.Entry<String, JsonNode>> iterator = 
instanceAssignmentConfigMap.fields();
+    while (iterator.hasNext()) {
+      Map.Entry<String, JsonNode> entry = iterator.next();
+      JsonNode instanceAssignmentConfig = entry.getValue();
+      ObjectNode tagPoolConfig = (ObjectNode) 
instanceAssignmentConfig.get("tagPoolConfig");
+      String srcTag = tagPoolConfig.get("tag").asText();
+      if (tagPoolReplacementMap.containsKey(srcTag)) {
+        tagPoolConfig.put("tag", tagPoolReplacementMap.get(srcTag));
+      }
+    }
+  }
+
   @PUT
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/tables/recommender")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ef65f6abcfb..78046efc84d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -153,6 +153,8 @@ import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentMa
 import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
 import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
 import org.apache.pinot.controller.workload.QueryWorkloadManager;
+import org.apache.pinot.core.util.NumberUtils;
+import org.apache.pinot.core.util.NumericException;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -172,6 +174,9 @@ import org.apache.pinot.spi.controller.ControllerJobType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -1725,12 +1730,31 @@ public class PinotHelixResourceManager {
   /**
    * Performs validations of table config and adds the table to zookeeper
    * @throws InvalidTableConfigException if validations fail
-   * @throws TableAlreadyExistsException for offline tables only if the table 
already exists
+   * @throws TableAlreadyExistsException if the table already exists
    */
   public void addTable(TableConfig tableConfig)
       throws IOException {
+    addTable(tableConfig, Collections.emptyList());
+  }
+
+  /**
+   * Performs validations of table config and adds the table to zookeeper
+   * <p>Call this api when you wanted to create a realtime table with 
consuming segments starting to ingest from
+   * designated offset and being assigned with a segment sequence number per 
partition. Otherwise, you should
+   * directly call the {@link #addTable(TableConfig)} which will further call 
this api with an empty list.
+   * @param tableConfig The config for the table to be created.
+   * @param consumeMeta A list of pairs, where each pair contains the 
partition group metadata and the initial sequence
+   *                    number for a consuming segment. This is used to start 
ingestion from a specific offset.
+   * @throws InvalidTableConfigException if validations fail
+   * @throws TableAlreadyExistsException if the table already exists
+   */
+  public void addTable(TableConfig tableConfig, 
List<Pair<PartitionGroupMetadata, Integer>> consumeMeta)
+      throws IOException {
     String tableNameWithType = tableConfig.getTableName();
     LOGGER.info("Adding table {}: Start", tableNameWithType);
+    if (consumeMeta != null && !consumeMeta.isEmpty()) {
+      LOGGER.info("Adding table {} with {} partition group infos", 
tableNameWithType, consumeMeta.size());
+    }
 
     if (getTableConfig(tableNameWithType) != null) {
       throw new TableAlreadyExistsException("Table config for " + 
tableNameWithType
@@ -1789,10 +1813,16 @@ public class PinotHelixResourceManager {
         // Add ideal state
         _helixAdmin.addResource(_helixClusterName, tableNameWithType, 
idealState);
         LOGGER.info("Adding table {}: Added ideal state for offline table", 
tableNameWithType);
-      } else {
+      } else if (consumeMeta == null || consumeMeta.isEmpty()) {
         // Add ideal state with the first CONSUMING segment
         _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
         LOGGER.info("Adding table {}: Added ideal state with first consuming 
segment", tableNameWithType);
+      } else {
+        // Add ideal state with the first CONSUMING segment with designated 
partition consuming metadata
+        // Add ideal state with the first CONSUMING segment
+        _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState, 
consumeMeta);
+        LOGGER.info("Adding table {}: Added consuming segments ideal state 
given the designated consuming metadata",
+                tableNameWithType);
       }
     } catch (Exception e) {
       LOGGER.error("Caught exception while setting up table: {}, cleaning it 
up", tableNameWithType, e);
@@ -4811,6 +4841,51 @@ public class PinotHelixResourceManager {
     return _queryWorkloadManager;
   }
 
+  /**
+   * Retrieves the consumer watermark for a given real-time table.
+   * <p>The watermark represents the next offset to be consumed for each 
partition group.
+   * If the latest segment of a partition is in a DONE state, the watermark is 
the end offset of the completed segment.
+   * Otherwise, it is the start offset of the current consuming segment.
+   *
+   * @param tableName The name of the real-time table (without type suffix).
+   * @return A {@link WatermarkInductionResult} containing a list of 
watermarks for each partition group.
+   * @throws TableNotFoundException if the specified real-time table does not 
exist.
+   * @throws IllegalStateException if the IdealState for the table is not 
found.
+   */
+  public WatermarkInductionResult getConsumerWatermarks(String tableName) 
throws TableNotFoundException {
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    if (!hasRealtimeTable(tableName)) {
+      throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+    }
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    IdealState idealState = _helixAdmin
+        .getResourceIdealState(getHelixClusterName(), tableNameWithType);
+    if (idealState == null) {
+      throw new IllegalStateException("Null IdealState of the table " + 
tableNameWithType);
+    }
+    List<PartitionGroupConsumptionStatus> lst = _pinotLLCRealtimeSegmentManager
+        .getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+    List<WatermarkInductionResult.Watermark> watermarks = 
lst.stream().map(status -> {
+      int seq = status.getSequenceNumber();
+      long startOffset;
+      try {
+        if ("DONE".equalsIgnoreCase(status.getStatus())) {
+          Preconditions.checkNotNull(status.getEndOffset());
+          startOffset = 
NumberUtils.parseLong(status.getEndOffset().toString());
+          seq++;
+        } else {
+          startOffset = 
NumberUtils.parseLong(status.getStartOffset().toString());
+        }
+      } catch (NumericException e) {
+        throw new RuntimeException(e);
+      }
+      return new 
WatermarkInductionResult.Watermark(status.getPartitionGroupId(), seq, 
startOffset);
+    }).collect(Collectors.toList());
+    return new WatermarkInductionResult(watermarks);
+  }
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java
new file mode 100644
index 00000000000..4d6a4aa8da0
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list 
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+  private List<Watermark> _watermarks;
+
+  /**
+   * The @JsonCreator annotation marks this constructor to be used for 
deserializing
+   * a JSON array back into a WaterMarks object.
+   *
+   * @param watermarks The list of watermarks.
+   */
+  @JsonCreator
+  public WatermarkInductionResult(@JsonProperty("watermarks") List<Watermark> 
watermarks) {
+    _watermarks = watermarks;
+  }
+
+  /**
+   * Gets the list of watermarks.
+   *
+   * @return The list of watermarks.
+   */
+  @JsonGetter("watermarks")
+  public List<Watermark> getWatermarks() {
+    return _watermarks;
+  }
+
+  /**
+   * Represents a single watermark with its partitionGroupId, sequence, and 
offset.
+   */
+  public static class Watermark {
+    private int _partitionGroupId;
+    private int _sequenceNumber;
+    private long _offset;
+
+    /**
+     * The @JsonCreator annotation tells Jackson to use this constructor to 
create
+     * a WaterMark instance from a JSON object. The @JsonProperty annotations
+     * map the keys in the JSON object to the constructor parameters.
+     *
+     * @param partitionGroupId The ID of the partition group.
+     * @param sequenceNumber The segment sequence number of the consuming 
segment.
+      * @param offset The first Kafka offset whose corresponding record has 
not yet sealed in Pinot
+     */
+    @JsonCreator
+    public Watermark(@JsonProperty("partitionGroupId") int partitionGroupId,
+        @JsonProperty("sequenceNumber") int sequenceNumber, 
@JsonProperty("offset") long offset) {
+      _partitionGroupId = partitionGroupId;
+      _sequenceNumber = sequenceNumber;
+      _offset = offset;
+    }
+
+    /**
+     * Gets the partition group ID.
+     *
+     * @return The partition group ID.
+     */
+    @JsonGetter("partitionGroupId")
+    public int getPartitionGroupId() {
+      return _partitionGroupId;
+    }
+
+    /**
+     * Gets the segment sequence number of the most recent consuming segment.
+     *
+     * @return The segment sequence number.
+     */
+    @JsonGetter("sequenceNumber")
+    public int getSequenceNumber() {
+      return _sequenceNumber;
+    }
+
+    /**
+     * The high-watermark of the Kafka offsets. Any Kafka record with an 
offset greater than or equal to this
+     * value has not yet been sealed.
+     *
+     * @return The offset.
+     */
+    @JsonGetter("offset")
+    public long getOffset() {
+      return _offset;
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5b0619a7ca9..55252182dd6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -56,6 +56,7 @@ import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.HelixAdmin;
@@ -376,6 +377,20 @@ public class PinotLLCRealtimeSegmentManager {
    * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC 
and LLC are configured.
    */
   public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    List<Pair<PartitionGroupMetadata, Integer>> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), idealState).stream().map(
+            x -> Pair.of(x, STARTING_SEQUENCE_NUMBER)
+        ).collect(Collectors.toList());
+    setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList);
+  }
+
+  /**
+   * Sets up the initial segments for a new LLC real-time table.
+   * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC 
and LLC are configured.
+   */
+  public void setUpNewTable(TableConfig tableConfig, IdealState idealState,
+      List<Pair<PartitionGroupMetadata, Integer>> consumeMeta) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -384,9 +399,7 @@ public class PinotLLCRealtimeSegmentManager {
     List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
     
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), idealState);
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
+    int numPartitionGroups = consumeMeta.size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     SegmentAssignment segmentAssignment =
@@ -396,11 +409,13 @@ public class PinotLLCRealtimeSegmentManager {
 
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
-    for (PartitionGroupMetadata partitionGroupMetadata : 
newPartitionGroupMetadataList) {
+    for (Pair<PartitionGroupMetadata, Integer> pair : consumeMeta) {
+      PartitionGroupMetadata metadata = pair.getLeft();
+      int sequence = pair.getRight();
       StreamConfig streamConfig = 
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
-          partitionGroupMetadata.getPartitionGroupId());
+          metadata.getPartitionGroupId());
       String segmentName =
-          setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, currentTimeMs, instancePartitions,
+          setupNewPartitionGroup(tableConfig, streamConfig, metadata, 
sequence, currentTimeMs, instancePartitions,
               numPartitionGroups, numReplicas);
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -1903,17 +1918,26 @@ public class PinotLLCRealtimeSegmentManager {
   private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig 
streamConfig,
       PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, 
InstancePartitions instancePartitions,
       int numPartitions, int numReplicas) {
+    return setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, STARTING_SEQUENCE_NUMBER,
+        creationTimeMs, instancePartitions, numPartitions, numReplicas);
+  }
+
+  private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig 
streamConfig,
+      PartitionGroupMetadata partitionGroupMetadata, int sequence, long 
creationTimeMs,
+      InstancePartitions instancePartitions, int numPartitions, int 
numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
     String startOffset = partitionGroupMetadata.getStartOffset().toString();
-    LOGGER.info("Setting up new partition group: {} for table: {}", 
partitionGroupId, realtimeTableName);
+    LOGGER.info("Setting up new partition group: {} for table: {} with 
sequence: {} and startOffset: {}",
+        partitionGroupId, realtimeTableName, sequence, startOffset);
 
     String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
     LLCSegmentName newLLCSegmentName =
-        new LLCSegmentName(rawTableName, partitionGroupId, 
STARTING_SEQUENCE_NUMBER, creationTimeMs);
+        new LLCSegmentName(rawTableName, partitionGroupId, sequence, 
creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
 
-    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(null, startOffset, 0);
+    CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(null,
+            startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
creationTimeMs,
         committingSegmentDescriptor, null, instancePartitions, numPartitions, 
numReplicas);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
new file mode 100644
index 00000000000..abbfa71e3ac
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.InputStream;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class PinotTableRestletResourceTest {
+
+  @Test
+  public void testTweakRealtimeTableConfig() throws Exception {
+    try (InputStream inputStream = 
Thread.currentThread().getContextClassLoader()
+        
.getResourceAsStream("table/table_config_with_instance_assignment.json")) {
+      ObjectNode tableConfig = (ObjectNode) 
JsonUtils.inputStreamToJsonNode(inputStream);
+
+      String brokerTenant = "testBroker";
+      String serverTenant = "testServer";
+      CopyTablePayload copyTablePayload = new 
CopyTablePayload("http://localhost:9000";, null, brokerTenant,
+          serverTenant, Map.of("server1_REALTIME", "testServer_REALTIME"));
+      PinotTableRestletResource.tweakRealtimeTableConfig(tableConfig, 
copyTablePayload);
+
+      assertEquals(tableConfig.get("tenants").get("broker").asText(), 
brokerTenant);
+      assertEquals(tableConfig.get("tenants").get("server").asText(), 
serverTenant);
+      
assertEquals(tableConfig.path("instanceAssignmentConfigMap").path("CONSUMING").path("tagPoolConfig").path("tag")
+          .asText(), serverTenant + "_REALTIME");
+    }
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index ef4f4c062d6..f4f876e4a55 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core;
 
 import com.google.common.collect.BiMap;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,6 +32,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixAdmin;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.model.ClusterConfig;
@@ -58,6 +61,7 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.api.resources.InstanceInfo;
 import org.apache.pinot.controller.helix.ControllerTest;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -75,6 +79,10 @@ import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Segment;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
@@ -88,6 +96,12 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
 
 
@@ -1596,10 +1610,115 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     assertEquals(actualSet, new HashSet<>(Arrays.asList(expected)));
   }
 
+  @Test
+  public void testGetConsumerWatermarks()
+      throws Exception {
+    String rawTableName = "watermarksTable";
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+    // Test table not found
+    assertThrows(TableNotFoundException.class, () -> 
_helixResourceManager.getConsumerWatermarks(rawTableName));
+
+    // Setup table
+    addDummySchema(rawTableName);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setBrokerTenant(BROKER_TENANT_NAME)
+            .setServerTenant(SERVER_TENANT_NAME)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+    _helixResourceManager.addTable(tableConfig);
+
+    // Setup mocks
+    PinotLLCRealtimeSegmentManager mockSegmentManager = 
mock(PinotLLCRealtimeSegmentManager.class);
+    Field llcManagerField = 
PinotHelixResourceManager.class.getDeclaredField("_pinotLLCRealtimeSegmentManager");
+    llcManagerField.setAccessible(true);
+    PinotLLCRealtimeSegmentManager originalLlcManager =
+        (PinotLLCRealtimeSegmentManager) 
llcManagerField.get(_helixResourceManager);
+    llcManagerField.set(_helixResourceManager, mockSegmentManager);
+
+    Field helixAdminField = 
PinotHelixResourceManager.class.getDeclaredField("_helixAdmin");
+    helixAdminField.setAccessible(true);
+    HelixAdmin originalHelixAdmin = (HelixAdmin) 
helixAdminField.get(_helixResourceManager);
+    HelixAdmin spyHelixAdmin = spy(originalHelixAdmin);
+    helixAdminField.set(_helixResourceManager, spyHelixAdmin);
+
+    IdealState idealState = new IdealState(realtimeTableName);
+    doReturn(idealState).when(spyHelixAdmin).getResourceIdealState(any(), 
eq(realtimeTableName));
+
+    // Test happy path
+    PartitionGroupConsumptionStatus doneStatus = new 
PartitionGroupConsumptionStatus(0, 100,
+        new LongMsgOffset(123), new LongMsgOffset(456), "done");
+    PartitionGroupConsumptionStatus inProgressStatus =
+        new PartitionGroupConsumptionStatus(1, 200, new LongMsgOffset(789), 
null, "IN_PROGRESS");
+    when(mockSegmentManager.getPartitionGroupConsumptionStatusList(any(), 
any()))
+        .thenReturn(Arrays.asList(doneStatus, inProgressStatus));
+
+    WatermarkInductionResult waterMarkInductionResult = 
_helixResourceManager.getConsumerWatermarks(rawTableName);
+    assertEquals(waterMarkInductionResult.getWatermarks().size(), 2);
+    WatermarkInductionResult.Watermark doneWatermark = 
waterMarkInductionResult.getWatermarks().get(0);
+    assertEquals(doneWatermark.getPartitionGroupId(), 0);
+    assertEquals(doneWatermark.getSequenceNumber(), 101L);
+    assertEquals(doneWatermark.getOffset(), 456L);
+    WatermarkInductionResult.Watermark inProgressWatermark = 
waterMarkInductionResult.getWatermarks().get(1);
+    assertEquals(inProgressWatermark.getPartitionGroupId(), 1);
+    assertEquals(inProgressWatermark.getSequenceNumber(), 200L);
+    assertEquals(inProgressWatermark.getOffset(), 789L);
+
+    // recover the original values
+    helixAdminField.set(_helixResourceManager, originalHelixAdmin);
+    llcManagerField.set(_helixResourceManager, originalLlcManager);
+    // Cleanup
+    _helixResourceManager.deleteRealtimeTable(rawTableName);
+    deleteSchema(rawTableName);
+  }
+
   @AfterClass
   public void tearDown() {
     stopFakeInstances();
     stopController();
     stopZk();
   }
+
+  @Test
+  public void testAddRealtimeTableWithConsumingMetadata()
+      throws Exception {
+    final String rawTableName = "testTable2";
+    final String realtimeTableName = rawTableName + "_REALTIME";
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setBrokerTenant(BROKER_TENANT_NAME)
+            .setServerTenant(SERVER_TENANT_NAME)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+    waitForEVToDisappear(tableConfig.getTableName());
+    addDummySchema(rawTableName);
+
+    List<Pair<PartitionGroupMetadata, Integer>> consumingMetadata = new 
ArrayList<>();
+    PartitionGroupMetadata metadata0 = mock(PartitionGroupMetadata.class);
+    when(metadata0.getPartitionGroupId()).thenReturn(0);
+    
when(metadata0.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
+    consumingMetadata.add(Pair.of(metadata0, 5));
+    PartitionGroupMetadata metadata1 = mock(PartitionGroupMetadata.class);
+    when(metadata1.getPartitionGroupId()).thenReturn(1);
+    
when(metadata1.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
+    consumingMetadata.add(Pair.of(metadata1, 10));
+
+    _helixResourceManager.addTable(tableConfig, consumingMetadata);
+
+    IdealState idealState = 
_helixResourceManager.getTableIdealState(realtimeTableName);
+    assertNotNull(idealState);
+    assertEquals(idealState.getPartitionSet().size(), 2);
+
+    for (String segmentName : idealState.getPartitionSet()) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+      int partitionGroupId = llcSegmentName.getPartitionGroupId();
+      if (partitionGroupId == 0) {
+        assertEquals(llcSegmentName.getSequenceNumber(), 5);
+      } else if (partitionGroupId == 1) {
+        assertEquals(llcSegmentName.getSequenceNumber(), 10);
+      } else {
+        fail("Unexpected partition group id: " + partitionGroupId);
+      }
+    }
+
+    _helixResourceManager.deleteRealtimeTable(rawTableName);
+    deleteSchema(rawTableName);
+  }
 }
diff --git 
a/pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json
 
b/pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json
new file mode 100644
index 00000000000..cf09d4e5f40
--- /dev/null
+++ 
b/pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json
@@ -0,0 +1,15 @@
+{
+  "tenants": {
+    "broker": "broker1",
+    "server": "server1"
+  },
+  "instanceAssignmentConfigMap": {
+    "CONSUMING": {
+      "tagPoolConfig": {
+        "tag": "server1_REALTIME",
+        "poolBased": true,
+        "numPools": 2
+      }
+    }
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 12fa14bcc82..a7fa0901982 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -347,6 +347,10 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "tables", tableName, "schema");
   }
 
+  public String forConsumerWatermarksGet(String tableName) {
+    return StringUtil.join("/", _baseUrl, "tables", tableName, 
"consumerWatermarks");
+  }
+
   public String forTableExternalView(String tableName) {
     return StringUtil.join("/", _baseUrl, "tables", tableName, "externalview");
   }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilderTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilderTest.java
new file mode 100644
index 00000000000..88272869389
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilderTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils.builder;
+
+import org.apache.pinot.spi.utils.StringUtil;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class ControllerRequestURLBuilderTest {
+
+  @Test
+  public void testForConsumerWatermarksGet() {
+    ControllerRequestURLBuilder builder = 
ControllerRequestURLBuilder.baseUrl("http://localhost:9000";);
+    assertEquals(builder.forConsumerWatermarksGet("myTable"),
+        StringUtil.join("/", "http://localhost:9000";, "tables", "myTable", 
"consumerWatermarks"));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to