This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe46c3038ab [Feature] Develop LaunchBackfillIngestionJob to support
complete backfill (#16890)
fe46c3038ab is described below
commit fe46c3038ab9f48408ecb5592b8732a270ea10cd
Author: Hongkun Xu <[email protected]>
AuthorDate: Wed Nov 5 00:19:23 2025 +0800
[Feature] Develop LaunchBackfillIngestionJob to support complete backfill
(#16890)
* Develop LaunchBackfillIngestionJob to support complete backfill
Signed-off-by: Hongkun Xu <[email protected]>
* integrated with PinotAdminClient
Signed-off-by: Hongkun Xu <[email protected]>
* add comments and add replaced parameters
Signed-off-by: Hongkun Xu <[email protected]>
---------
Signed-off-by: Hongkun Xu <[email protected]>
---
.../pinot/client/admin/PinotAdminClient.java | 13 +
.../pinot/client/admin/PinotAdminTransport.java | 6 +-
.../pinot/client/admin/PinotSegmentApiClient.java | 76 +++++
pinot-tools/pom.xml | 4 +
.../pinot/tools/admin/PinotAdministrator.java | 2 +
.../command/LaunchBackfillIngestionJobCommand.java | 323 +++++++++++++++++++++
.../command/LaunchDataIngestionJobCommand.java | 16 +-
7 files changed, 430 insertions(+), 10 deletions(-)
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
index 701040111f4..0032a944475 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
@@ -44,6 +44,7 @@ public class PinotAdminClient implements AutoCloseable {
private PinotSegmentAdminClient _segmentClient;
private PinotTenantAdminClient _tenantClient;
private PinotTaskAdminClient _taskClient;
+ private PinotSegmentApiClient _segmentApiClient;
/**
* Creates a PinotAdminClient with the specified controller address.
@@ -124,6 +125,18 @@ public class PinotAdminClient implements AutoCloseable {
return _tableClient;
}
+ /**
+ * Gets the segment api client.
+ *
+ * @return Segment administration operations
+ */
+ public PinotSegmentApiClient getSegmentApiClient() {
+ if (_segmentApiClient == null) {
+ _segmentApiClient = new PinotSegmentApiClient(_transport,
_controllerAddress, _headers);
+ }
+ return _segmentApiClient;
+ }
+
/**
* Gets the schema administration client.
*
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
index 1a3e680f039..934b59ec6a4 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
public class PinotAdminTransport implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotAdminTransport.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ public static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS =
"pinot.admin.request.timeout.ms";
+ public static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
/**
* Gets the ObjectMapper instance for JSON serialization/deserialization.
@@ -68,10 +70,10 @@ public class PinotAdminTransport implements AutoCloseable {
_defaultHeaders = authHeaders != null ? authHeaders : Map.of();
// Extract timeout configuration
- _requestTimeoutMs =
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms",
"60000"));
+ _requestTimeoutMs =
Integer.parseInt(properties.getProperty(ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS,
"60000"));
// Extract scheme (http/https)
- String scheme = properties.getProperty("pinot.admin.scheme",
CommonConstants.HTTP_PROTOCOL);
+ String scheme = properties.getProperty(ADMIN_TRANSPORT_SCHEME,
CommonConstants.HTTP_PROTOCOL);
_scheme = scheme;
// Build HTTP client
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java
new file mode 100644
index 00000000000..644ec26f57e
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * The <code>SegmentApiClient</code> class provides a http client for invoking
segment rest apis.
+ */
+public class PinotSegmentApiClient implements Closeable {
+ private final PinotAdminTransport _transport;
+ private final String _controllerAddress;
+ private final Map<String, String> _headers;
+
+ public PinotSegmentApiClient(PinotAdminTransport transport, String
controllerAddress,
+ Map<String, String> headers) {
+ _transport = transport;
+ _controllerAddress = controllerAddress;
+ _headers = headers;
+ }
+
+ public static class QueryParameters {
+ public static final String TABLE_NAME = "tableName";
+ public static final String TYPE = "type";
+ public static final String START_TIMESTAMP = "startTimestamp";
+ public static final String END_TIMESTAMP = "endTimestamp";
+ public static final String EXCLUDE_OVERLAPPING = "excludeOverlapping";
+ }
+
+ private static final String SEGMENT_PATH = "/segments";
+ private static final String SELECT_PATH = "/select";
+ private static final String METADATA_PATH = "/metadata";
+
+ public JsonNode selectSegments(String rawTableName, String tableType, long
startTimestamp, long endTimestamp,
+ boolean excludeOverlapping) throws PinotAdminException {
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put(QueryParameters.START_TIMESTAMP,
String.valueOf(startTimestamp));
+ queryParams.put(QueryParameters.END_TIMESTAMP,
String.valueOf(endTimestamp));
+ queryParams.put(QueryParameters.EXCLUDE_OVERLAPPING,
String.valueOf(excludeOverlapping));
+ queryParams.put(QueryParameters.TYPE, tableType);
+ return _transport.executeGet(_controllerAddress, SEGMENT_PATH + "/" +
rawTableName + SELECT_PATH,
+ queryParams, _headers);
+ }
+
+ public JsonNode getSegmentMetadata(String tableNameWithType, String
segmentName) throws PinotAdminException {
+ return _transport.executeGet(_controllerAddress,
+ SEGMENT_PATH + "/" + tableNameWithType + "/" + segmentName +
METADATA_PATH, null, _headers);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _transport.close();
+ }
+}
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index fdccae67156..b77dbb050ae 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -191,6 +191,10 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-java-client</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index b387f2cb604..be4f22ec33a 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -43,6 +43,7 @@ import
org.apache.pinot.tools.admin.command.GenerateDataCommand;
import org.apache.pinot.tools.admin.command.GitHubEventsQuickStartCommand;
import org.apache.pinot.tools.admin.command.ImportDataCommand;
import org.apache.pinot.tools.admin.command.JsonToPinotSchema;
+import org.apache.pinot.tools.admin.command.LaunchBackfillIngestionJobCommand;
import org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand;
import org.apache.pinot.tools.admin.command.LaunchSparkDataIngestionJobCommand;
import org.apache.pinot.tools.admin.command.MoveReplicaGroup;
@@ -100,6 +101,7 @@ public class PinotAdministrator {
SUBCOMMAND_MAP.put("OperateClusterConfig", new
OperateClusterConfigCommand());
SUBCOMMAND_MAP.put("GenerateData", new GenerateDataCommand());
SUBCOMMAND_MAP.put("LaunchDataIngestionJob", new
LaunchDataIngestionJobCommand());
+ SUBCOMMAND_MAP.put("LaunchBackfillIngestionJob", new
LaunchBackfillIngestionJobCommand());
SUBCOMMAND_MAP.put("LaunchSparkDataIngestionJob", new
LaunchSparkDataIngestionJobCommand());
SUBCOMMAND_MAP.put("CreateSegment", new CreateSegmentCommand());
SUBCOMMAND_MAP.put("ImportData", new ImportDataCommand());
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java
new file mode 100644
index 00000000000..22b359ffe6a
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.client.admin.PinotAdminClient;
+import org.apache.pinot.client.admin.PinotAdminTransport;
+import org.apache.pinot.client.admin.PinotSegmentApiClient;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
+import org.apache.pinot.common.utils.http.HttpClient;
+import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionUtils;
+import org.apache.pinot.segment.local.utils.ConsistentDataPushUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentNameGeneratorType;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import
org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.PinotIngestionJobType;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.GroovyTemplateUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+
+
[email protected](name = "LaunchBackfillIngestionJob")
+public class LaunchBackfillIngestionJobCommand extends
LaunchDataIngestionJobCommand {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LaunchBackfillIngestionJobCommand.class);
+
+ private static final String OFFLINE_TABLE_TYPE = "OFFLINE";
+
+ @CommandLine.Option(names = {"-startDate"}, required = true, description =
"Backfill start date (inclusive). "
+ + "Segments in backfill date range will be disabled.")
+ private String _backfillStartDate;
+ @CommandLine.Option(names = {"-endDate"}, required = true, description =
"Backfill end date (exclusive). "
+ + "Segments in backfill date range will be disabled.")
+ private String _backfillEndDate;
+ @CommandLine.Option(names = {"-partitionColumn"}, required = false,
description = "The column name on which "
+ + "segments were partitioned using BoundedColumnValue function.")
+ private String _partitionColumn;
+ @CommandLine.Option(names = {"-partitionColumnValue"}, required = false,
description =
+ "Segments with this partition column value " + "will only be eligible
for backfill.")
+ private String _partitionColumnValue;
+ private PinotSegmentApiClient _pinotSegmentApiClient;
+
+ @Override
+ public boolean execute()
+ throws Exception {
+ SegmentGenerationJobSpec spec;
+ try {
+ // Prepare job spec and controller client
+ spec = prepareSegmentGeneratorSpec();
+ URI controllerURI =
URI.create(spec.getPinotClusterSpecs()[0].getControllerURI());
+ String controllerAddress = controllerURI.getHost() + ":" +
controllerURI.getPort();
+
+ if (StringUtils.isBlank(spec.getAuthToken())) {
+ spec.setAuthToken(AuthProviderUtils.makeAuthProvider(_authProvider,
_authTokenUrl, _authToken, _user, _password)
+ .getTaskToken());
+ }
+ Map<String, String> authHeader = getAuthHeaders(spec);
+ Properties transportProperties = getTransportProperties(spec);
+ PinotAdminClient adminClient = new PinotAdminClient(controllerAddress,
transportProperties, authHeader);
+
+ _pinotSegmentApiClient = adminClient.getSegmentApiClient();
+ // 1. Fetch existing segments that need to be backfilled (to be replaced)
+ List<String> segmentsToBackfill = fetchSegmentsToBackfill(spec);
+
+ // 2. Generate new segments locally
+ List<String> generatedSegments = generateSegments(spec);
+
+ // Build table name and authentication info
+ String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE)
+ .tableNameWithType(spec.getTableSpec().getTableName());
+ String uploadURL = spec.getPinotClusterSpecs()[0].getControllerURI();
+ AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
+
+ // 3. Create a lineage entry to track replacement (old vs new segments)
+ String segmentLineageEntryId = createSegmentLineageEntry(
+ tableNameWithType, uploadURL, segmentsToBackfill, generatedSegments,
authProvider);
+
+ // 4. Push new segments to Pinot and mark lineage entry as completed
+ pushSegmentsAndEndReplace(spec, tableNameWithType, uploadURL,
segmentLineageEntryId, authProvider, controllerURI);
+ } catch (Exception e) {
+ LOGGER.error("Got exception to generate IngestionJobSpec for backfill
ingestion job - ", e);
+ throw e;
+ }
+ return true;
+ }
+
+ private SegmentGenerationJobSpec prepareSegmentGeneratorSpec()
+ throws Exception {
+ SegmentGenerationJobSpec spec;
+ try {
+ spec = IngestionJobLauncher.getSegmentGenerationJobSpec(
+ _jobSpecFile, _propertyFile,
GroovyTemplateUtils.getTemplateContext(_values), System.getenv());
+
+ long currentTime = System.currentTimeMillis();
+ spec.setOutputDirURI(spec.getOutputDirURI() + File.separator +
currentTime);
+
+ SegmentNameGeneratorSpec nameGeneratorSpec =
spec.getSegmentNameGeneratorSpec();
+ if (nameGeneratorSpec == null) {
+ nameGeneratorSpec = new SegmentNameGeneratorSpec();
+ }
+ nameGeneratorSpec.setType(SegmentNameGeneratorType.SIMPLE);
+ Map<String, String> configs =
+ nameGeneratorSpec.getConfigs() != null ?
nameGeneratorSpec.getConfigs() : new HashMap<>();
+ configs.putIfAbsent(SegmentGenerationTaskRunner.SEGMENT_NAME_POSTFIX,
String.valueOf(currentTime));
+ nameGeneratorSpec.setConfigs(configs);
+ spec.setSegmentNameGeneratorSpec(nameGeneratorSpec);
+ } catch (Exception e) {
+ LOGGER.error("Got exception to generate IngestionJobSpec for backfill
ingestion job - ", e);
+ throw e;
+ }
+
+ return spec;
+ }
+
+ public Properties getTransportProperties(SegmentGenerationJobSpec spec) {
+ Properties transportProperties = new Properties();
+ URI controllerURI =
URI.create(spec.getPinotClusterSpecs()[0].getControllerURI());
+
transportProperties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME,
controllerURI.getScheme());
+ return transportProperties;
+ }
+
+ public Map<String, String> getAuthHeaders(SegmentGenerationJobSpec spec) {
+ Map<String, String> authHeaders = new HashMap<>();
+ authHeaders.put("Authorization", spec.getAuthToken());
+ return authHeaders;
+ }
+
+ private List<String> fetchSegmentsToBackfill(SegmentGenerationJobSpec spec)
+ throws Exception {
+ List<String> segmentsToBackfill = new ArrayList<>();
+ long startMs = getMillis(_backfillStartDate);
+ long endMs = getMillis(_backfillEndDate);
+
+ JsonNode segmentsJson =
+
_pinotSegmentApiClient.selectSegments(spec.getTableSpec().getTableName(),
OFFLINE_TABLE_TYPE,
+ startMs, endMs, true);
+ segmentsJson.findPath(OFFLINE_TABLE_TYPE).forEach(seg ->
segmentsToBackfill.add(seg.textValue()));
+
+ // Partition filter
+ if (StringUtils.isNotEmpty(_partitionColumn) &&
StringUtils.isNotEmpty(_partitionColumnValue)) {
+ segmentsToBackfill.removeIf(segmentName ->
!isSegmentMatchPartition(spec, segmentName));
+ }
+
+ LOGGER.info("Existing segments: \n{}",
JsonUtils.objectToString(segmentsToBackfill));
+ return segmentsToBackfill;
+ }
+
+ /**
+ * Checks if a segment matches the specified partition column and value.
+ *
+ * @param spec SegmentGenerationJobSpec containing table and cluster info
+ * @param segmentName Name of the segment to check
+ * @return true if the segment matches the partition, false otherwise
+ */
+ private boolean isSegmentMatchPartition(SegmentGenerationJobSpec spec,
String segmentName) {
+ try {
+ JsonNode response = _pinotSegmentApiClient.getSegmentMetadata(
+
TableNameBuilder.OFFLINE.tableNameWithType(spec.getTableSpec().getTableName()),
+ segmentName);
+ Map<String, String> segmentMetadata =
JsonUtils.jsonNodeToStringMap(response);
+
+ // Skip segments without partition metadata
+ if
(!segmentMetadata.containsKey(CommonConstants.Segment.PARTITION_METADATA)) {
+ return false;
+ }
+
+ SegmentPartitionMetadata partitionMetadata =
+
SegmentPartitionMetadata.fromJsonString(segmentMetadata.get(CommonConstants.Segment.PARTITION_METADATA));
+
+ ColumnPartitionMetadata columnMetadata =
partitionMetadata.getColumnPartitionMap().get(_partitionColumn);
+ // NOTE: Currently Pinot only supports at most one partition per column;
+ // multi-partition columns are not yet supported.
+ if (columnMetadata == null || columnMetadata.getPartitions().size() !=
1) {
+ return false;
+ }
+
+ // Compute partition ID for the specified partition column value
+ int partitionId = PartitionFunctionFactory.getPartitionFunction(
+ columnMetadata.getFunctionName(),
+ columnMetadata.getNumPartitions(),
+ columnMetadata.getFunctionConfig()
+ ).getPartition(_partitionColumnValue);
+
+ // Return true if segment contains the computed partition
+ return columnMetadata.getPartitions().contains(partitionId);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to fetch partition info for segment [{}],
skipping.", segmentName, e);
+ return false;
+ }
+ }
+
+ private List<String> generateSegments(SegmentGenerationJobSpec spec)
+ throws Exception {
+ spec.setJobType(String.valueOf(PinotIngestionJobType.SegmentCreation));
+ IngestionJobLauncher.runIngestionJob(spec);
+
+ List<String> generatedSegments = collectSegmentNames(spec);
+ LOGGER.info("New segments: \n{}",
JsonUtils.objectToString(generatedSegments));
+ return generatedSegments;
+ }
+
+ private List<String> collectSegmentNames(SegmentGenerationJobSpec spec) {
+ URI outputDirURI;
+ try {
+ outputDirURI = new URI(spec.getOutputDirURI());
+ if (outputDirURI.getScheme() == null) {
+ outputDirURI = new File(spec.getOutputDirURI()).toURI();
+ }
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("outputDirURI is not valid - '" +
spec.getOutputDirURI() + "'");
+ }
+ PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+
+ // Get list of files to process
+ String[] files;
+ try {
+ files = outputDirFS.listFiles(outputDirURI, true);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
+ }
+
+ List<String> generatedSegments = new ArrayList<>();
+ for (String file : files) {
+ if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+ generatedSegments.add(
+ file.substring(file.lastIndexOf(File.separator) + 1,
file.indexOf(Constants.TAR_GZ_FILE_EXT)));
+ }
+ }
+ return generatedSegments;
+ }
+
+ private String createSegmentLineageEntry(String tableNameWithType, String
uploadURL,
+ List<String> segmentsToBackfill, List<String> generatedSegments,
+ AuthProvider authProvider)
+ throws Exception {
+ LOGGER.info("Start replacing segments by creating a lineage entry.");
+ String segmentLineageEntryId = SegmentConversionUtils.startSegmentReplace(
+ tableNameWithType, uploadURL, new
StartReplaceSegmentsRequest(segmentsToBackfill, generatedSegments),
+ authProvider, false);
+ LOGGER.info("Created segment lineage entry: [{}]", segmentLineageEntryId);
+ return segmentLineageEntryId;
+ }
+
+ private void pushSegmentsAndEndReplace(SegmentGenerationJobSpec spec, String
tableNameWithType,
+ String uploadURL, String segmentLineageEntryId,
+ AuthProvider authProvider, URI controllerURI)
+ throws Exception {
+ try {
+ spec.setJobType(String.valueOf(PinotIngestionJobType.SegmentTarPush));
+ IngestionJobLauncher.runIngestionJob(spec);
+
+ LOGGER.info("End replacing segments by updating lineage entry: [{}]",
segmentLineageEntryId);
+ SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL,
segmentLineageEntryId,
+ HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, authProvider);
+ LOGGER.info("New segments are pushed successfully and now serving
queries.");
+ } catch (Exception e) {
+ LOGGER.error("Failed to upload segments, reverting lineage entry.", e);
+ Map<URI, String> uriToLineageEntryIdMap =
Collections.singletonMap(controllerURI, segmentLineageEntryId);
+ ConsistentDataPushUtils.handleUploadException(spec,
uriToLineageEntryIdMap, e);
+ LOGGER.info("Reverted lineage entry [{}]. Any newly uploaded segments
should have been deleted.",
+ segmentLineageEntryId);
+ throw e;
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "LaunchBackfillIngestionJob";
+ }
+
+ @Override
+ public String description() {
+ return "Launch a backfill ingestion job.";
+ }
+
+ private long getMillis(String backfillDate) {
+ return
LocalDate.parse(backfillDate).atStartOfDay(ZoneId.of("UTC")).toInstant().toEpochMilli();
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index d4f9788fc5a..a5b9d85d35f 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -44,23 +44,23 @@ public class LaunchDataIngestionJobCommand extends
AbstractBaseAdminCommand impl
private static final Logger LOGGER =
LoggerFactory.getLogger(LaunchDataIngestionJobCommand.class);
@CommandLine.Option(names = {"-jobSpecFile", "-jobSpec"}, required = true,
description = "Ingestion job spec file")
- private String _jobSpecFile;
+ protected String _jobSpecFile;
@CommandLine.Option(names = {"-values"}, required = false, arity = "1..*",
description = "Context values set to the job spec template")
- private List<String> _values;
+ protected List<String> _values;
@CommandLine.Option(names = {"-propertyFile"}, required = false,
description = "A property file contains context values to set the job
spec template")
- private String _propertyFile;
+ protected String _propertyFile;
@CommandLine.Option(names = {"-user"}, required = false, description =
"Username for basic auth.")
- private String _user;
+ protected String _user;
@CommandLine.Option(names = {"-password"}, required = false, description =
"Password for basic auth.")
- private String _password;
+ protected String _password;
@CommandLine.Option(names = {"-authToken"}, required = false, description =
"Http auth token.")
- private String _authToken;
+ protected String _authToken;
@CommandLine.Option(names = {"-authTokenUrl"}, required = false, description
= "Http auth token url.")
- private String _authTokenUrl;
+ protected String _authTokenUrl;
- private AuthProvider _authProvider;
+ protected AuthProvider _authProvider;
public String getJobSpecFile() {
return _jobSpecFile;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]