This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4cf67652b6d [Real-time Table Replication X clusters][1/n] Creating new
tables with designated consuming segments (#17235)
4cf67652b6d is described below
commit 4cf67652b6d30864c7e475c27c741ec8a85b5791
Author: Xuanyi Li <[email protected]>
AuthorDate: Thu Jan 15 14:56:14 2026 -0800
[Real-time Table Replication X clusters][1/n] Creating new tables with
designated consuming segments (#17235)
* [Pinot Copy Table] induct the watermark of consuming status
* [Pinot Copy Table] create first consuming segments per partition
* [Pinot Copy Table] copy table with designated tenant and watermarks
Reviewers: #ldap_rta-group, ureview, muller.liu
Reviewed By: #ldap_rta-group, muller.liu
Tags: #has_java
JIRA Issues: EVA-12009
Differential Revision: https://code.uberinternal.com/D20152399
* Add licenses
* rename watermark endpoint; private copyTablePayload
* fix the unit test
* Add logs for copying consuming segments
* add more logs
* add http:// to the uri
* instanceAssignmentConfig tag compatible with lower jackson version
* make test tenant name more generic
* enforce user input http:// or https://
* null check or comment why skip null check for mandantory field
* make private field for watermarkResults
* offline, upsert not supported
* cosmetic change with more comments
* postpone schema validation
* check table existance at the beginning
* add consumerWatermarks url builder
* add schema and realtime table config into the response
* introduce dryrun mode
* add license
* mvn spotless:apply
* remove redundant log
* move knobs to path parameter
* Fix JavaDoc inconsistencies for consumer watermarks API
- Updated PinotRealtimeTableResource.java: Changed misleading JavaDoc from
"Get table ideal state" to accurately describe the consumer watermarks
endpoint
- Updated PinotHelixResourceManager.java: Added explicit documentation
clarifying
that the API is restricted to realtime tables only and works with both
upsert
and non-upsert tables
* Replace PartitionGroupInfo with Pair<PartitionGroupMetadata, Integer>
- Updated PinotHelixResourceManager.addTable() to accept
List<Pair<PartitionGroupMetadata, Integer>>
- Updated PinotLLCRealtimeSegmentManager.setUpNewTable() to use Pair
instead of PartitionGroupInfo
- Refactored setupNewPartitionGroup() to accept metadata and sequence
separately
- Updated PinotTableRestletResource copyTable endpoint to create Pair
instances
- Updated unit tests in PinotHelixResourceManagerStatelessTest and
PartitionGroupInfoTest
- Added necessary imports for PartitionGroupMetadata and Pair
* Remove unused PartitionGroupInfo class and test
- Deleted PartitionGroupInfo.java as it has been replaced with
Pair<PartitionGroupMetadata, Integer>
- Deleted PartitionGroupInfoTest.java as the class is no longer needed
- Verified no other classes reference PartitionGroupInfo
* javadoc
---
.../controller/api/resources/CopyTablePayload.java | 88 +++++++++++++
.../api/resources/CopyTableResponse.java | 99 ++++++++++++++
.../api/resources/PinotRealtimeTableResource.java | 22 ++++
.../api/resources/PinotTableRestletResource.java | 142 +++++++++++++++++++++
.../helix/core/PinotHelixResourceManager.java | 79 +++++++++++-
.../helix/core/WatermarkInductionResult.java | 111 ++++++++++++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 42 ++++--
.../resources/PinotTableRestletResourceTest.java | 49 +++++++
.../PinotHelixResourceManagerStatelessTest.java | 119 +++++++++++++++++
.../table_config_with_instance_assignment.json | 15 +++
.../utils/builder/ControllerRequestURLBuilder.java | 4 +
.../builder/ControllerRequestURLBuilderTest.java | 34 +++++
12 files changed, 793 insertions(+), 11 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java
new file mode 100644
index 00000000000..8583ff36a3c
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+ private String _sourceClusterUri;
+ private Map<String, String> _headers;
+ /**
+ * Broker tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+ */
+ private String _brokerTenant;
+ /**
+ * Server tenant for the new table.
+ * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+ */
+ private String _serverTenant;
+
+ /**
+ * The instanceAssignmentConfig's tagPoolConfig contains full tenant name.
We will use this field to let user specify
+ * the replacement relation from source cluster's full tenant to target
cluster's full tenant.
+ */
+ private Map<String, String> _tagPoolReplacementMap;
+
+ @JsonCreator
+ public CopyTablePayload(
+ @JsonProperty(value = "sourceClusterUri", required = true) String
sourceClusterUri,
+ @JsonProperty("sourceClusterHeaders") Map<String, String> headers,
+ @JsonProperty(value = "brokerTenant", required = true) String
brokerTenant,
+ @JsonProperty(value = "serverTenant", required = true) String
serverTenant,
+ @JsonProperty("tagPoolReplacementMap") @Nullable Map<String, String>
tagPoolReplacementMap) {
+ _sourceClusterUri = sourceClusterUri;
+ _headers = headers;
+ _brokerTenant = brokerTenant;
+ _serverTenant = serverTenant;
+ _tagPoolReplacementMap = tagPoolReplacementMap;
+ }
+
+ @JsonGetter("sourceClusterUri")
+ public String getSourceClusterUri() {
+ return _sourceClusterUri;
+ }
+
+ @JsonGetter("sourceClusterHeaders")
+ public Map<String, String> getHeaders() {
+ return _headers;
+ }
+
+ @JsonGetter("brokerTenant")
+ public String getBrokerTenant() {
+ return _brokerTenant;
+ }
+
+ @JsonGetter("serverTenant")
+ public String getServerTenant() {
+ return _serverTenant;
+ }
+
+ @JsonGetter("tagPoolReplacementMap")
+ public Map<String, String> getTagPoolReplacementMap() {
+ return _tagPoolReplacementMap;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTableResponse.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTableResponse.java
new file mode 100644
index 00000000000..91b11d5a41d
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTableResponse.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CopyTableResponse {
+ @JsonProperty("msg")
+ private String _msg;
+
+ @JsonProperty("status")
+ private String _status;
+
+ @JsonProperty("schema")
+ private Schema _schema;
+
+ @JsonProperty("realtimeTableConfig")
+ private TableConfig _tableConfig;
+
+ @JsonProperty("watermarkInductionResult")
+ private WatermarkInductionResult _watermarkInductionResult;
+
+ @JsonCreator
+ public CopyTableResponse(@JsonProperty("status") String status,
@JsonProperty("msg") String msg,
+ @JsonProperty("schema") @Nullable Schema schema,
+ @JsonProperty("realtimeTableConfig") @Nullable TableConfig tableConfig,
+ @JsonProperty("watermarkInductionResult") @Nullable
WatermarkInductionResult watermarkInductionResult) {
+ _status = status;
+ _msg = msg;
+ _schema = schema;
+ _tableConfig = tableConfig;
+ _watermarkInductionResult = watermarkInductionResult;
+ }
+
+ public String getMsg() {
+ return _msg;
+ }
+
+ public void setMsg(String msg) {
+ _msg = msg;
+ }
+
+ public String getStatus() {
+ return _status;
+ }
+
+ public void setStatus(String status) {
+ _status = status;
+ }
+
+ public Schema getSchema() {
+ return _schema;
+ }
+
+ public void setSchema(Schema schema) {
+ _schema = schema;
+ }
+
+ public TableConfig getTableConfig() {
+ return _tableConfig;
+ }
+
+ public void setTableConfig(TableConfig tableConfig) {
+ _tableConfig = tableConfig;
+ }
+
+ public WatermarkInductionResult getWatermarkInductionResult() {
+ return _watermarkInductionResult;
+ }
+
+ public void setWatermarkInductionResult(
+ WatermarkInductionResult watermarkInductionResult) {
+ _watermarkInductionResult = watermarkInductionResult;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 2e7ad9b880a..00219cf7a34 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -53,10 +53,12 @@ import javax.ws.rs.core.Response;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
@@ -449,6 +451,26 @@ public class PinotRealtimeTableResource {
}
}
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/tables/{tableName}/consumerWatermarks")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_IDEAL_STATE)
+ @ApiOperation(value = "Get consumer watermarks for a realtime table",
+ notes = "Returns the next offset to be consumed for each partition
group. Only works for realtime tables.")
+ public WatermarkInductionResult getConsumerWatermark(
+ @ApiParam(value = "Name of the realtime table", required = true)
@PathParam("tableName") String tableName,
+ @Context HttpHeaders headers) {
+ try {
+ String table = DatabaseUtils.translateTableName(tableName, headers);
+ return _pinotHelixResourceManager.getConsumerWatermarks(table);
+ } catch (TableNotFoundException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND, e);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
private Map<String, Set<String>> getInstanceToErrorSegmentsMap(String
tableNameWithType) {
ExternalView externalView =
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
Preconditions.checkState(externalView != null, "External view does not
exist for table: " + tableNameWithType);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 2175394555b..b749fddc4b2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import io.swagger.annotations.Api;
@@ -36,6 +37,7 @@ import it.unimi.dsi.fastutil.Arrays;
import it.unimi.dsi.fastutil.Swapper;
import it.unimi.dsi.fastutil.ints.IntComparator;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -86,7 +88,9 @@ import
org.apache.pinot.common.restlet.resources.TableSegmentValidationInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.LogicalTableConfigUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessType;
@@ -96,6 +100,7 @@ import
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.exception.TableAlreadyExistsException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -121,9 +126,12 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.controller.ControllerJobType;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.zookeeper.data.Stat;
@@ -283,6 +291,140 @@ public class PinotTableRestletResource {
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @ApiParam(value = "Include verbose information in response")
+ @QueryParam("verbose") @DefaultValue("false") boolean verbose,
+ @ApiParam(value = "Dry run mode") @QueryParam("dryRun")
@DefaultValue("true") boolean dryRun,
+ @Context HttpHeaders headers) {
+ try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+ if
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
!= null
+ ||
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
!= null) {
+ throw new TableAlreadyExistsException("Table config for " + tableName
+ + " already exists. If this is unexpected, try deleting the table
to remove all metadata associated"
+ + " with it before attempting to recreate.");
+ }
+
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ ControllerRequestURLBuilder urlBuilder =
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+ URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+
+ URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ URI watermarkUri = new
URI(urlBuilder.forConsumerWatermarksGet(tableName));
+ SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+ String watermarkJson = watermarkResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: {}",
tableName, watermarkJson);
+ WatermarkInductionResult watermarkInductionResult =
+ JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+ boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+ boolean hasRealtime = tableConfigNode.has(TableType.REALTIME.name());
+ if (hasOffline && !hasRealtime) {
+ throw new IllegalStateException("pure offline table copy not supported
yet");
+ }
+
+ ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+ tweakRealtimeTableConfig(realtimeTableConfigNode, copyTablePayload);
+ TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+ if (realtimeTableConfig.getUpsertConfig() != null) {
+ throw new IllegalStateException("upsert table copy not supported");
+ }
+ LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}", tableName);
+
+ if (dryRun) {
+ return new CopyTableResponse("success", "Dry run", schema,
realtimeTableConfig, watermarkInductionResult);
+ }
+
+ List<Pair<PartitionGroupMetadata, Integer>> partitionGroupInfos =
watermarkInductionResult.getWatermarks()
+ .stream()
+ .map(watermark -> Pair.of(
+ new PartitionGroupMetadata(watermark.getPartitionGroupId(), new
LongMsgOffset(watermark.getOffset())),
+ watermark.getSequenceNumber()))
+ .collect(Collectors.toList());
+
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {}",
tableName);
+ // Add the table with designated starting kafka offset and segment
sequence number to create consuming segments
+ _pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ LOGGER.info("[copyTable] Successfully added table config: {} with
designated high watermark", tableName);
+ CopyTableResponse response = new CopyTableResponse("success", "Table
copied successfully", null, null, null);
+ if (hasOffline) {
+ response = new CopyTableResponse("warn", "detect offline too; it will
only copy real-time segments",
+ null, null, null);
+ }
+ if (verbose) {
+ response.setSchema(schema);
+ response.setTableConfig(realtimeTableConfig);
+ response.setWatermarkInductionResult(watermarkInductionResult);
+ }
+ return response;
+ } catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Helper method to tweak the realtime table config. This method is used to
set the broker and server tenants, and
+ * optionally replace the pool tags in the instance assignment config.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param copyTablePayload The payload containing tenant and tag pool
replacement information.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
CopyTablePayload copyTablePayload) {
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map<String, String> tagPoolReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ ObjectNode tenantConfig = (ObjectNode)
realtimeTableConfigNode.get("tenants");
+ tenantConfig.put("broker", brokerTenant);
+ tenantConfig.put("server", serverTenant);
+ if (tagPoolReplacementMap == null || tagPoolReplacementMap.isEmpty()) {
+ return;
+ }
+ JsonNode instanceAssignmentConfigMap =
realtimeTableConfigNode.get("instanceAssignmentConfigMap");
+ if (instanceAssignmentConfigMap == null) {
+ return;
+ }
+ java.util.Iterator<Map.Entry<String, JsonNode>> iterator =
instanceAssignmentConfigMap.fields();
+ while (iterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ JsonNode instanceAssignmentConfig = entry.getValue();
+ ObjectNode tagPoolConfig = (ObjectNode)
instanceAssignmentConfig.get("tagPoolConfig");
+ String srcTag = tagPoolConfig.get("tag").asText();
+ if (tagPoolReplacementMap.containsKey(srcTag)) {
+ tagPoolConfig.put("tag", tagPoolReplacementMap.get(srcTag));
+ }
+ }
+ }
+
@PUT
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/recommender")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ef65f6abcfb..78046efc84d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -153,6 +153,8 @@ import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentMa
import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.workload.QueryWorkloadManager;
+import org.apache.pinot.core.util.NumberUtils;
+import org.apache.pinot.core.util.NumericException;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
@@ -172,6 +174,9 @@ import org.apache.pinot.spi.controller.ControllerJobType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -1725,12 +1730,31 @@ public class PinotHelixResourceManager {
/**
* Performs validations of table config and adds the table to zookeeper
* @throws InvalidTableConfigException if validations fail
- * @throws TableAlreadyExistsException for offline tables only if the table
already exists
+ * @throws TableAlreadyExistsException if the table already exists
*/
public void addTable(TableConfig tableConfig)
throws IOException {
+ addTable(tableConfig, Collections.emptyList());
+ }
+
+ /**
+ * Performs validations of table config and adds the table to zookeeper
+ * <p>Call this api when you wanted to create a realtime table with
consuming segments starting to ingest from
+ * designated offset and being assigned with a segment sequence number per
partition. Otherwise, you should
+ * directly call the {@link #addTable(TableConfig)} which will further call
this api with an empty list.
+ * @param tableConfig The config for the table to be created.
+ * @param consumeMeta A list of pairs, where each pair contains the
partition group metadata and the initial sequence
+ * number for a consuming segment. This is used to start
ingestion from a specific offset.
+ * @throws InvalidTableConfigException if validations fail
+ * @throws TableAlreadyExistsException if the table already exists
+ */
+ public void addTable(TableConfig tableConfig,
List<Pair<PartitionGroupMetadata, Integer>> consumeMeta)
+ throws IOException {
String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Adding table {}: Start", tableNameWithType);
+ if (consumeMeta != null && !consumeMeta.isEmpty()) {
+ LOGGER.info("Adding table {} with {} partition group infos",
tableNameWithType, consumeMeta.size());
+ }
if (getTableConfig(tableNameWithType) != null) {
throw new TableAlreadyExistsException("Table config for " +
tableNameWithType
@@ -1789,10 +1813,16 @@ public class PinotHelixResourceManager {
// Add ideal state
_helixAdmin.addResource(_helixClusterName, tableNameWithType,
idealState);
LOGGER.info("Adding table {}: Added ideal state for offline table",
tableNameWithType);
- } else {
+ } else if (consumeMeta == null || consumeMeta.isEmpty()) {
// Add ideal state with the first CONSUMING segment
_pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
LOGGER.info("Adding table {}: Added ideal state with first consuming
segment", tableNameWithType);
+ } else {
+ // Add ideal state with the first CONSUMING segment with designated
partition consuming metadata
+ // Add ideal state with the first CONSUMING segment
+ _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState,
consumeMeta);
+ LOGGER.info("Adding table {}: Added consuming segments ideal state
given the designated consuming metadata",
+ tableNameWithType);
}
} catch (Exception e) {
LOGGER.error("Caught exception while setting up table: {}, cleaning it
up", tableNameWithType, e);
@@ -4811,6 +4841,51 @@ public class PinotHelixResourceManager {
return _queryWorkloadManager;
}
+ /**
+ * Retrieves the consumer watermark for a given real-time table.
+ * <p>The watermark represents the next offset to be consumed for each
partition group.
+ * If the latest segment of a partition is in a DONE state, the watermark is
the end offset of the completed segment.
+ * Otherwise, it is the start offset of the current consuming segment.
+ *
+ * @param tableName The name of the real-time table (without type suffix).
+ * @return A {@link WatermarkInductionResult} containing a list of
watermarks for each partition group.
+ * @throws TableNotFoundException if the specified real-time table does not
exist.
+ * @throws IllegalStateException if the IdealState for the table is not
found.
+ */
+ public WatermarkInductionResult getConsumerWatermarks(String tableName)
throws TableNotFoundException {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ if (!hasRealtimeTable(tableName)) {
+ throw new TableNotFoundException("Table " + tableNameWithType + " does
not exist");
+ }
+ TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType +
"exists but null tableConfig");
+ List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+ IdealState idealState = _helixAdmin
+ .getResourceIdealState(getHelixClusterName(), tableNameWithType);
+ if (idealState == null) {
+ throw new IllegalStateException("Null IdealState of the table " +
tableNameWithType);
+ }
+ List<PartitionGroupConsumptionStatus> lst = _pinotLLCRealtimeSegmentManager
+ .getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+ List<WatermarkInductionResult.Watermark> watermarks =
lst.stream().map(status -> {
+ int seq = status.getSequenceNumber();
+ long startOffset;
+ try {
+ if ("DONE".equalsIgnoreCase(status.getStatus())) {
+ Preconditions.checkNotNull(status.getEndOffset());
+ startOffset =
NumberUtils.parseLong(status.getEndOffset().toString());
+ seq++;
+ } else {
+ startOffset =
NumberUtils.parseLong(status.getStartOffset().toString());
+ }
+ } catch (NumericException e) {
+ throw new RuntimeException(e);
+ }
+ return new
WatermarkInductionResult.Watermark(status.getPartitionGroupId(), seq,
startOffset);
+ }).collect(Collectors.toList());
+ return new WatermarkInductionResult(watermarks);
+ }
+
/*
* Uncomment and use for testing on a real cluster
public static void main(String[] args) throws Exception {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java
new file mode 100644
index 00000000000..4d6a4aa8da0
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+ private List<Watermark> _watermarks;
+
+ /**
+ * The @JsonCreator annotation marks this constructor to be used for
deserializing
+ * a JSON array back into a WaterMarks object.
+ *
+ * @param watermarks The list of watermarks.
+ */
+ @JsonCreator
+ public WatermarkInductionResult(@JsonProperty("watermarks") List<Watermark>
watermarks) {
+ _watermarks = watermarks;
+ }
+
+ /**
+ * Gets the list of watermarks.
+ *
+ * @return The list of watermarks.
+ */
+ @JsonGetter("watermarks")
+ public List<Watermark> getWatermarks() {
+ return _watermarks;
+ }
+
+ /**
+ * Represents a single watermark with its partitionGroupId, sequence, and
offset.
+ */
+ public static class Watermark {
+ private int _partitionGroupId;
+ private int _sequenceNumber;
+ private long _offset;
+
+ /**
+ * The @JsonCreator annotation tells Jackson to use this constructor to
create
+ * a WaterMark instance from a JSON object. The @JsonProperty annotations
+ * map the keys in the JSON object to the constructor parameters.
+ *
+ * @param partitionGroupId The ID of the partition group.
+ * @param sequenceNumber The segment sequence number of the consuming
segment.
+ * @param offset The first Kafka offset whose corresponding record has
not yet sealed in Pinot
+ */
+ @JsonCreator
+ public Watermark(@JsonProperty("partitionGroupId") int partitionGroupId,
+ @JsonProperty("sequenceNumber") int sequenceNumber,
@JsonProperty("offset") long offset) {
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _offset = offset;
+ }
+
+ /**
+ * Gets the partition group ID.
+ *
+ * @return The partition group ID.
+ */
+ @JsonGetter("partitionGroupId")
+ public int getPartitionGroupId() {
+ return _partitionGroupId;
+ }
+
+ /**
+ * Gets the segment sequence number of the most recent consuming segment.
+ *
+ * @return The segment sequence number.
+ */
+ @JsonGetter("sequenceNumber")
+ public int getSequenceNumber() {
+ return _sequenceNumber;
+ }
+
+ /**
+ * The high-watermark of the Kafka offsets. Any Kafka record with an
offset greater than or equal to this
+ * value has not yet been sealed.
+ *
+ * @return The offset.
+ */
+ @JsonGetter("offset")
+ public long getOffset() {
+ return _offset;
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5b0619a7ca9..55252182dd6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -56,6 +56,7 @@ import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.HelixAdmin;
@@ -376,6 +377,20 @@ public class PinotLLCRealtimeSegmentManager {
* <p>NOTE: the passed in IdealState may contain HLC segments if both HLC
and LLC are configured.
*/
public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
+ List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
+ List<Pair<PartitionGroupMetadata, Integer>> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(), idealState).stream().map(
+ x -> Pair.of(x, STARTING_SEQUENCE_NUMBER)
+ ).collect(Collectors.toList());
+ setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList);
+ }
+
+ /**
+ * Sets up the initial segments for a new LLC real-time table.
+ * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC
and LLC are configured.
+ */
+ public void setUpNewTable(TableConfig tableConfig, IdealState idealState,
+ List<Pair<PartitionGroupMetadata, Integer>> consumeMeta) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
@@ -384,9 +399,7 @@ public class PinotLLCRealtimeSegmentManager {
List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(), idealState);
- int numPartitionGroups = newPartitionGroupMetadataList.size();
+ int numPartitionGroups = consumeMeta.size();
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
SegmentAssignment segmentAssignment =
@@ -396,11 +409,13 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
- for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ for (Pair<PartitionGroupMetadata, Integer> pair : consumeMeta) {
+ PartitionGroupMetadata metadata = pair.getLeft();
+ int sequence = pair.getRight();
StreamConfig streamConfig =
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
- partitionGroupMetadata.getPartitionGroupId());
+ metadata.getPartitionGroupId());
String segmentName =
- setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupMetadata, currentTimeMs, instancePartitions,
+ setupNewPartitionGroup(tableConfig, streamConfig, metadata,
sequence, currentTimeMs, instancePartitions,
numPartitionGroups, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
segmentName, segmentAssignment,
instancePartitionsMap);
@@ -1903,17 +1918,26 @@ public class PinotLLCRealtimeSegmentManager {
private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig
streamConfig,
PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs,
InstancePartitions instancePartitions,
int numPartitions, int numReplicas) {
+ return setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupMetadata, STARTING_SEQUENCE_NUMBER,
+ creationTimeMs, instancePartitions, numPartitions, numReplicas);
+ }
+
+ private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig
streamConfig,
+ PartitionGroupMetadata partitionGroupMetadata, int sequence, long
creationTimeMs,
+ InstancePartitions instancePartitions, int numPartitions, int
numReplicas) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
String startOffset = partitionGroupMetadata.getStartOffset().toString();
- LOGGER.info("Setting up new partition group: {} for table: {}",
partitionGroupId, realtimeTableName);
+ LOGGER.info("Setting up new partition group: {} for table: {} with
sequence: {} and startOffset: {}",
+ partitionGroupId, realtimeTableName, sequence, startOffset);
String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
LLCSegmentName newLLCSegmentName =
- new LLCSegmentName(rawTableName, partitionGroupId,
STARTING_SEQUENCE_NUMBER, creationTimeMs);
+ new LLCSegmentName(rawTableName, partitionGroupId, sequence,
creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null, startOffset, 0);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null,
+ startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitions,
numReplicas);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
new file mode 100644
index 00000000000..abbfa71e3ac
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.InputStream;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class PinotTableRestletResourceTest {
+
+ @Test
+ public void testTweakRealtimeTableConfig() throws Exception {
+ try (InputStream inputStream =
Thread.currentThread().getContextClassLoader()
+
.getResourceAsStream("table/table_config_with_instance_assignment.json")) {
+ ObjectNode tableConfig = (ObjectNode)
JsonUtils.inputStreamToJsonNode(inputStream);
+
+ String brokerTenant = "testBroker";
+ String serverTenant = "testServer";
+ CopyTablePayload copyTablePayload = new
CopyTablePayload("http://localhost:9000", null, brokerTenant,
+ serverTenant, Map.of("server1_REALTIME", "testServer_REALTIME"));
+ PinotTableRestletResource.tweakRealtimeTableConfig(tableConfig,
copyTablePayload);
+
+ assertEquals(tableConfig.get("tenants").get("broker").asText(),
brokerTenant);
+ assertEquals(tableConfig.get("tenants").get("server").asText(),
serverTenant);
+
assertEquals(tableConfig.path("instanceAssignmentConfigMap").path("CONSUMING").path("tagPoolConfig").path("tag")
+ .asText(), serverTenant + "_REALTIME");
+ }
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index ef4f4c062d6..f4f876e4a55 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core;
import com.google.common.collect.BiMap;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -31,6 +32,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixAdmin;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.model.ClusterConfig;
@@ -58,6 +61,7 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.api.resources.InstanceInfo;
import org.apache.pinot.controller.helix.ControllerTest;
+import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -75,6 +79,10 @@ import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Segment;
import org.apache.pinot.spi.utils.CommonConstants.Server;
@@ -88,6 +96,12 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -1596,10 +1610,115 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
assertEquals(actualSet, new HashSet<>(Arrays.asList(expected)));
}
+ @Test
+ public void testGetConsumerWatermarks()
+ throws Exception {
+ String rawTableName = "watermarksTable";
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+ // Test table not found
+ assertThrows(TableNotFoundException.class, () ->
_helixResourceManager.getConsumerWatermarks(rawTableName));
+
+ // Setup table
+ addDummySchema(rawTableName);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setBrokerTenant(BROKER_TENANT_NAME)
+ .setServerTenant(SERVER_TENANT_NAME)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+ _helixResourceManager.addTable(tableConfig);
+
+ // Setup mocks
+ PinotLLCRealtimeSegmentManager mockSegmentManager =
mock(PinotLLCRealtimeSegmentManager.class);
+ Field llcManagerField =
PinotHelixResourceManager.class.getDeclaredField("_pinotLLCRealtimeSegmentManager");
+ llcManagerField.setAccessible(true);
+ PinotLLCRealtimeSegmentManager originalLlcManager =
+ (PinotLLCRealtimeSegmentManager)
llcManagerField.get(_helixResourceManager);
+ llcManagerField.set(_helixResourceManager, mockSegmentManager);
+
+ Field helixAdminField =
PinotHelixResourceManager.class.getDeclaredField("_helixAdmin");
+ helixAdminField.setAccessible(true);
+ HelixAdmin originalHelixAdmin = (HelixAdmin)
helixAdminField.get(_helixResourceManager);
+ HelixAdmin spyHelixAdmin = spy(originalHelixAdmin);
+ helixAdminField.set(_helixResourceManager, spyHelixAdmin);
+
+ IdealState idealState = new IdealState(realtimeTableName);
+ doReturn(idealState).when(spyHelixAdmin).getResourceIdealState(any(),
eq(realtimeTableName));
+
+ // Test happy path
+ PartitionGroupConsumptionStatus doneStatus = new
PartitionGroupConsumptionStatus(0, 100,
+ new LongMsgOffset(123), new LongMsgOffset(456), "done");
+ PartitionGroupConsumptionStatus inProgressStatus =
+ new PartitionGroupConsumptionStatus(1, 200, new LongMsgOffset(789),
null, "IN_PROGRESS");
+ when(mockSegmentManager.getPartitionGroupConsumptionStatusList(any(),
any()))
+ .thenReturn(Arrays.asList(doneStatus, inProgressStatus));
+
+ WatermarkInductionResult waterMarkInductionResult =
_helixResourceManager.getConsumerWatermarks(rawTableName);
+ assertEquals(waterMarkInductionResult.getWatermarks().size(), 2);
+ WatermarkInductionResult.Watermark doneWatermark =
waterMarkInductionResult.getWatermarks().get(0);
+ assertEquals(doneWatermark.getPartitionGroupId(), 0);
+ assertEquals(doneWatermark.getSequenceNumber(), 101L);
+ assertEquals(doneWatermark.getOffset(), 456L);
+ WatermarkInductionResult.Watermark inProgressWatermark =
waterMarkInductionResult.getWatermarks().get(1);
+ assertEquals(inProgressWatermark.getPartitionGroupId(), 1);
+ assertEquals(inProgressWatermark.getSequenceNumber(), 200L);
+ assertEquals(inProgressWatermark.getOffset(), 789L);
+
+ // recover the original values
+ helixAdminField.set(_helixResourceManager, originalHelixAdmin);
+ llcManagerField.set(_helixResourceManager, originalLlcManager);
+ // Cleanup
+ _helixResourceManager.deleteRealtimeTable(rawTableName);
+ deleteSchema(rawTableName);
+ }
+
@AfterClass
public void tearDown() {
stopFakeInstances();
stopController();
stopZk();
}
+
+ @Test
+ public void testAddRealtimeTableWithConsumingMetadata()
+ throws Exception {
+ final String rawTableName = "testTable2";
+ final String realtimeTableName = rawTableName + "_REALTIME";
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setBrokerTenant(BROKER_TENANT_NAME)
+ .setServerTenant(SERVER_TENANT_NAME)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap()).build();
+ waitForEVToDisappear(tableConfig.getTableName());
+ addDummySchema(rawTableName);
+
+ List<Pair<PartitionGroupMetadata, Integer>> consumingMetadata = new
ArrayList<>();
+ PartitionGroupMetadata metadata0 = mock(PartitionGroupMetadata.class);
+ when(metadata0.getPartitionGroupId()).thenReturn(0);
+
when(metadata0.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
+ consumingMetadata.add(Pair.of(metadata0, 5));
+ PartitionGroupMetadata metadata1 = mock(PartitionGroupMetadata.class);
+ when(metadata1.getPartitionGroupId()).thenReturn(1);
+
when(metadata1.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
+ consumingMetadata.add(Pair.of(metadata1, 10));
+
+ _helixResourceManager.addTable(tableConfig, consumingMetadata);
+
+ IdealState idealState =
_helixResourceManager.getTableIdealState(realtimeTableName);
+ assertNotNull(idealState);
+ assertEquals(idealState.getPartitionSet().size(), 2);
+
+ for (String segmentName : idealState.getPartitionSet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ int partitionGroupId = llcSegmentName.getPartitionGroupId();
+ if (partitionGroupId == 0) {
+ assertEquals(llcSegmentName.getSequenceNumber(), 5);
+ } else if (partitionGroupId == 1) {
+ assertEquals(llcSegmentName.getSequenceNumber(), 10);
+ } else {
+ fail("Unexpected partition group id: " + partitionGroupId);
+ }
+ }
+
+ _helixResourceManager.deleteRealtimeTable(rawTableName);
+ deleteSchema(rawTableName);
+ }
}
diff --git
a/pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json
b/pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json
new file mode 100644
index 00000000000..cf09d4e5f40
--- /dev/null
+++
b/pinot-controller/src/test/resources/table/table_config_with_instance_assignment.json
@@ -0,0 +1,15 @@
+{
+ "tenants": {
+ "broker": "broker1",
+ "server": "server1"
+ },
+ "instanceAssignmentConfigMap": {
+ "CONSUMING": {
+ "tagPoolConfig": {
+ "tag": "server1_REALTIME",
+ "poolBased": true,
+ "numPools": 2
+ }
+ }
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 12fa14bcc82..a7fa0901982 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -347,6 +347,10 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "tables", tableName, "schema");
}
+ public String forConsumerWatermarksGet(String tableName) {
+ return StringUtil.join("/", _baseUrl, "tables", tableName,
"consumerWatermarks");
+ }
+
public String forTableExternalView(String tableName) {
return StringUtil.join("/", _baseUrl, "tables", tableName, "externalview");
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilderTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilderTest.java
new file mode 100644
index 00000000000..88272869389
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilderTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils.builder;
+
+import org.apache.pinot.spi.utils.StringUtil;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class ControllerRequestURLBuilderTest {
+
+ @Test
+ public void testForConsumerWatermarksGet() {
+ ControllerRequestURLBuilder builder =
ControllerRequestURLBuilder.baseUrl("http://localhost:9000");
+ assertEquals(builder.forConsumerWatermarksGet("myTable"),
+ StringUtil.join("/", "http://localhost:9000", "tables", "myTable",
"consumerWatermarks"));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]