This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe46c3038ab [Feature] Develop LaunchBackfillIngestionJob to support 
complete backfill (#16890)
fe46c3038ab is described below

commit fe46c3038ab9f48408ecb5592b8732a270ea10cd
Author: Hongkun Xu <[email protected]>
AuthorDate: Wed Nov 5 00:19:23 2025 +0800

    [Feature] Develop LaunchBackfillIngestionJob to support complete backfill 
(#16890)
    
    * Develop LaunchBackfillIngestionJob to support complete backfill
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    * integrated with PinotAdminClient
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    * add comments and add replaced parameters
    
    Signed-off-by: Hongkun Xu <[email protected]>
    
    ---------
    
    Signed-off-by: Hongkun Xu <[email protected]>
---
 .../pinot/client/admin/PinotAdminClient.java       |  13 +
 .../pinot/client/admin/PinotAdminTransport.java    |   6 +-
 .../pinot/client/admin/PinotSegmentApiClient.java  |  76 +++++
 pinot-tools/pom.xml                                |   4 +
 .../pinot/tools/admin/PinotAdministrator.java      |   2 +
 .../command/LaunchBackfillIngestionJobCommand.java | 323 +++++++++++++++++++++
 .../command/LaunchDataIngestionJobCommand.java     |  16 +-
 7 files changed, 430 insertions(+), 10 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
index 701040111f4..0032a944475 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClient.java
@@ -44,6 +44,7 @@ public class PinotAdminClient implements AutoCloseable {
   private PinotSegmentAdminClient _segmentClient;
   private PinotTenantAdminClient _tenantClient;
   private PinotTaskAdminClient _taskClient;
+  private PinotSegmentApiClient _segmentApiClient;
 
   /**
    * Creates a PinotAdminClient with the specified controller address.
@@ -124,6 +125,18 @@ public class PinotAdminClient implements AutoCloseable {
     return _tableClient;
   }
 
+  /**
+   * Gets the segment api client.
+   *
+   * @return Segment administration operations
+   */
+  public PinotSegmentApiClient getSegmentApiClient() {
+    if (_segmentApiClient == null) {
+      _segmentApiClient = new PinotSegmentApiClient(_transport, 
_controllerAddress, _headers);
+    }
+    return _segmentApiClient;
+  }
+
   /**
    * Gets the schema administration client.
    *
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
index 1a3e680f039..934b59ec6a4 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminTransport.java
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
 public class PinotAdminTransport implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminTransport.class);
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  public static final String ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS = 
"pinot.admin.request.timeout.ms";
+  public static final String ADMIN_TRANSPORT_SCHEME = "pinot.admin.scheme";
 
   /**
    * Gets the ObjectMapper instance for JSON serialization/deserialization.
@@ -68,10 +70,10 @@ public class PinotAdminTransport implements AutoCloseable {
     _defaultHeaders = authHeaders != null ? authHeaders : Map.of();
 
     // Extract timeout configuration
-    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty("pinot.admin.request.timeout.ms", 
"60000"));
+    _requestTimeoutMs = 
Integer.parseInt(properties.getProperty(ADMIN_TRANSPORT_REQUEST_TIMEOUT_MS, 
"60000"));
 
     // Extract scheme (http/https)
-    String scheme = properties.getProperty("pinot.admin.scheme", 
CommonConstants.HTTP_PROTOCOL);
+    String scheme = properties.getProperty(ADMIN_TRANSPORT_SCHEME, 
CommonConstants.HTTP_PROTOCOL);
     _scheme = scheme;
 
     // Build HTTP client
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java
new file mode 100644
index 00000000000..644ec26f57e
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotSegmentApiClient.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.admin;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * The <code>SegmentApiClient</code> class provides a http client for invoking 
segment rest apis.
+ */
+public class PinotSegmentApiClient implements Closeable {
+  private final PinotAdminTransport _transport;
+  private final String _controllerAddress;
+  private final Map<String, String> _headers;
+
+  public PinotSegmentApiClient(PinotAdminTransport transport, String 
controllerAddress,
+      Map<String, String> headers) {
+    _transport = transport;
+    _controllerAddress = controllerAddress;
+    _headers = headers;
+  }
+
+  public static class QueryParameters {
+    public static final String TABLE_NAME = "tableName";
+    public static final String TYPE = "type";
+    public static final String START_TIMESTAMP = "startTimestamp";
+    public static final String END_TIMESTAMP = "endTimestamp";
+    public static final String EXCLUDE_OVERLAPPING = "excludeOverlapping";
+  }
+
+  private static final String SEGMENT_PATH = "/segments";
+  private static final String SELECT_PATH = "/select";
+  private static final String METADATA_PATH = "/metadata";
+
+  public JsonNode selectSegments(String rawTableName, String tableType, long 
startTimestamp, long endTimestamp,
+      boolean excludeOverlapping) throws PinotAdminException {
+    Map<String, String> queryParams = new HashMap<>();
+    queryParams.put(QueryParameters.START_TIMESTAMP, 
String.valueOf(startTimestamp));
+    queryParams.put(QueryParameters.END_TIMESTAMP, 
String.valueOf(endTimestamp));
+    queryParams.put(QueryParameters.EXCLUDE_OVERLAPPING, 
String.valueOf(excludeOverlapping));
+    queryParams.put(QueryParameters.TYPE, tableType);
+    return _transport.executeGet(_controllerAddress, SEGMENT_PATH + "/" + 
rawTableName + SELECT_PATH,
+        queryParams, _headers);
+  }
+
+  public JsonNode getSegmentMetadata(String tableNameWithType, String 
segmentName) throws PinotAdminException {
+    return _transport.executeGet(_controllerAddress,
+        SEGMENT_PATH + "/" + tableNameWithType + "/" + segmentName + 
METADATA_PATH, null, _headers);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _transport.close();
+  }
+}
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index fdccae67156..b77dbb050ae 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -191,6 +191,10 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-java-client</artifactId>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index b387f2cb604..be4f22ec33a 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -43,6 +43,7 @@ import 
org.apache.pinot.tools.admin.command.GenerateDataCommand;
 import org.apache.pinot.tools.admin.command.GitHubEventsQuickStartCommand;
 import org.apache.pinot.tools.admin.command.ImportDataCommand;
 import org.apache.pinot.tools.admin.command.JsonToPinotSchema;
+import org.apache.pinot.tools.admin.command.LaunchBackfillIngestionJobCommand;
 import org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand;
 import org.apache.pinot.tools.admin.command.LaunchSparkDataIngestionJobCommand;
 import org.apache.pinot.tools.admin.command.MoveReplicaGroup;
@@ -100,6 +101,7 @@ public class PinotAdministrator {
     SUBCOMMAND_MAP.put("OperateClusterConfig", new 
OperateClusterConfigCommand());
     SUBCOMMAND_MAP.put("GenerateData", new GenerateDataCommand());
     SUBCOMMAND_MAP.put("LaunchDataIngestionJob", new 
LaunchDataIngestionJobCommand());
+    SUBCOMMAND_MAP.put("LaunchBackfillIngestionJob", new 
LaunchBackfillIngestionJobCommand());
     SUBCOMMAND_MAP.put("LaunchSparkDataIngestionJob", new 
LaunchSparkDataIngestionJobCommand());
     SUBCOMMAND_MAP.put("CreateSegment", new CreateSegmentCommand());
     SUBCOMMAND_MAP.put("ImportData", new ImportDataCommand());
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java
new file mode 100644
index 00000000000..22b359ffe6a
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.client.admin.PinotAdminClient;
+import org.apache.pinot.client.admin.PinotAdminTransport;
+import org.apache.pinot.client.admin.PinotSegmentApiClient;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
+import org.apache.pinot.common.utils.http.HttpClient;
+import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.plugin.minion.tasks.SegmentConversionUtils;
+import org.apache.pinot.segment.local.utils.ConsistentDataPushUtils;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import 
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentNameGeneratorType;
+import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import 
org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.PinotIngestionJobType;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.GroovyTemplateUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+
+
[email protected](name = "LaunchBackfillIngestionJob")
+public class LaunchBackfillIngestionJobCommand extends 
LaunchDataIngestionJobCommand {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LaunchBackfillIngestionJobCommand.class);
+
+  private static final String OFFLINE_TABLE_TYPE = "OFFLINE";
+
+  @CommandLine.Option(names = {"-startDate"}, required = true, description = 
"Backfill start date (inclusive). "
+      + "Segments in backfill date range will be disabled.")
+  private String _backfillStartDate;
+  @CommandLine.Option(names = {"-endDate"}, required = true, description = 
"Backfill end date (exclusive). "
+      + "Segments in backfill date range will be disabled.")
+  private String _backfillEndDate;
+  @CommandLine.Option(names = {"-partitionColumn"}, required = false, 
description = "The column name on which "
+      + "segments were partitioned using BoundedColumnValue function.")
+  private String _partitionColumn;
+  @CommandLine.Option(names = {"-partitionColumnValue"}, required = false, 
description =
+      "Segments with this partition column value " + "will only be eligible 
for backfill.")
+  private String _partitionColumnValue;
+  private PinotSegmentApiClient _pinotSegmentApiClient;
+
+  @Override
+  public boolean execute()
+      throws Exception {
+    SegmentGenerationJobSpec spec;
+    try {
+      // Prepare job spec and controller client
+      spec = prepareSegmentGeneratorSpec();
+      URI controllerURI = 
URI.create(spec.getPinotClusterSpecs()[0].getControllerURI());
+      String controllerAddress = controllerURI.getHost() + ":" + 
controllerURI.getPort();
+
+      if (StringUtils.isBlank(spec.getAuthToken())) {
+        spec.setAuthToken(AuthProviderUtils.makeAuthProvider(_authProvider, 
_authTokenUrl, _authToken, _user, _password)
+            .getTaskToken());
+      }
+      Map<String, String> authHeader = getAuthHeaders(spec);
+      Properties transportProperties = getTransportProperties(spec);
+      PinotAdminClient adminClient = new PinotAdminClient(controllerAddress, 
transportProperties, authHeader);
+
+      _pinotSegmentApiClient = adminClient.getSegmentApiClient();
+      // 1. Fetch existing segments that need to be backfilled (to be replaced)
+      List<String> segmentsToBackfill = fetchSegmentsToBackfill(spec);
+
+      // 2. Generate new segments locally
+      List<String> generatedSegments = generateSegments(spec);
+
+      // Build table name and authentication info
+      String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE)
+          .tableNameWithType(spec.getTableSpec().getTableName());
+      String uploadURL = spec.getPinotClusterSpecs()[0].getControllerURI();
+      AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
+
+      // 3. Create a lineage entry to track replacement (old vs new segments)
+      String segmentLineageEntryId = createSegmentLineageEntry(
+          tableNameWithType, uploadURL, segmentsToBackfill, generatedSegments, 
authProvider);
+
+      // 4. Push new segments to Pinot and mark lineage entry as completed
+      pushSegmentsAndEndReplace(spec, tableNameWithType, uploadURL, 
segmentLineageEntryId, authProvider, controllerURI);
+    } catch (Exception e) {
+      LOGGER.error("Got exception to generate IngestionJobSpec for backfill 
ingestion job - ", e);
+      throw e;
+    }
+    return true;
+  }
+
+  private SegmentGenerationJobSpec prepareSegmentGeneratorSpec()
+      throws Exception {
+    SegmentGenerationJobSpec spec;
+    try {
+      spec = IngestionJobLauncher.getSegmentGenerationJobSpec(
+          _jobSpecFile, _propertyFile, 
GroovyTemplateUtils.getTemplateContext(_values), System.getenv());
+
+      long currentTime = System.currentTimeMillis();
+      spec.setOutputDirURI(spec.getOutputDirURI() + File.separator + 
currentTime);
+
+      SegmentNameGeneratorSpec nameGeneratorSpec = 
spec.getSegmentNameGeneratorSpec();
+      if (nameGeneratorSpec == null) {
+        nameGeneratorSpec = new SegmentNameGeneratorSpec();
+      }
+      nameGeneratorSpec.setType(SegmentNameGeneratorType.SIMPLE);
+      Map<String, String> configs =
+          nameGeneratorSpec.getConfigs() != null ? 
nameGeneratorSpec.getConfigs() : new HashMap<>();
+      configs.putIfAbsent(SegmentGenerationTaskRunner.SEGMENT_NAME_POSTFIX, 
String.valueOf(currentTime));
+      nameGeneratorSpec.setConfigs(configs);
+      spec.setSegmentNameGeneratorSpec(nameGeneratorSpec);
+    } catch (Exception e) {
+      LOGGER.error("Got exception to generate IngestionJobSpec for backfill 
ingestion job - ", e);
+      throw e;
+    }
+
+    return spec;
+  }
+
+  public Properties getTransportProperties(SegmentGenerationJobSpec spec) {
+    Properties transportProperties = new Properties();
+    URI controllerURI = 
URI.create(spec.getPinotClusterSpecs()[0].getControllerURI());
+    
transportProperties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME, 
controllerURI.getScheme());
+    return transportProperties;
+  }
+
+  public Map<String, String> getAuthHeaders(SegmentGenerationJobSpec spec) {
+    Map<String, String> authHeaders = new HashMap<>();
+    authHeaders.put("Authorization", spec.getAuthToken());
+    return authHeaders;
+  }
+
+  private List<String> fetchSegmentsToBackfill(SegmentGenerationJobSpec spec)
+      throws Exception {
+    List<String> segmentsToBackfill = new ArrayList<>();
+    long startMs = getMillis(_backfillStartDate);
+    long endMs = getMillis(_backfillEndDate);
+
+    JsonNode segmentsJson =
+        
_pinotSegmentApiClient.selectSegments(spec.getTableSpec().getTableName(), 
OFFLINE_TABLE_TYPE,
+            startMs, endMs, true);
+    segmentsJson.findPath(OFFLINE_TABLE_TYPE).forEach(seg -> 
segmentsToBackfill.add(seg.textValue()));
+
+    // Partition filter
+    if (StringUtils.isNotEmpty(_partitionColumn) && 
StringUtils.isNotEmpty(_partitionColumnValue)) {
+      segmentsToBackfill.removeIf(segmentName -> 
!isSegmentMatchPartition(spec, segmentName));
+    }
+
+    LOGGER.info("Existing segments: \n{}", 
JsonUtils.objectToString(segmentsToBackfill));
+    return segmentsToBackfill;
+  }
+
+  /**
+   * Checks if a segment matches the specified partition column and value.
+   *
+   * @param spec SegmentGenerationJobSpec containing table and cluster info
+   * @param segmentName Name of the segment to check
+   * @return true if the segment matches the partition, false otherwise
+   */
+  private boolean isSegmentMatchPartition(SegmentGenerationJobSpec spec, 
String segmentName) {
+    try {
+      JsonNode response = _pinotSegmentApiClient.getSegmentMetadata(
+          
TableNameBuilder.OFFLINE.tableNameWithType(spec.getTableSpec().getTableName()),
+          segmentName);
+      Map<String, String> segmentMetadata = 
JsonUtils.jsonNodeToStringMap(response);
+
+      // Skip segments without partition metadata
+      if 
(!segmentMetadata.containsKey(CommonConstants.Segment.PARTITION_METADATA)) {
+        return false;
+      }
+
+      SegmentPartitionMetadata partitionMetadata =
+          
SegmentPartitionMetadata.fromJsonString(segmentMetadata.get(CommonConstants.Segment.PARTITION_METADATA));
+
+      ColumnPartitionMetadata columnMetadata = 
partitionMetadata.getColumnPartitionMap().get(_partitionColumn);
+      // NOTE: Currently Pinot only supports at most one partition per column;
+      // multi-partition columns are not yet supported.
+      if (columnMetadata == null || columnMetadata.getPartitions().size() != 
1) {
+        return false;
+      }
+
+      // Compute partition ID for the specified partition column value
+      int partitionId = PartitionFunctionFactory.getPartitionFunction(
+          columnMetadata.getFunctionName(),
+          columnMetadata.getNumPartitions(),
+          columnMetadata.getFunctionConfig()
+      ).getPartition(_partitionColumnValue);
+
+      // Return true if segment contains the computed partition
+      return columnMetadata.getPartitions().contains(partitionId);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to fetch partition info for segment [{}], 
skipping.", segmentName, e);
+      return false;
+    }
+  }
+
+  private List<String> generateSegments(SegmentGenerationJobSpec spec)
+      throws Exception {
+    spec.setJobType(String.valueOf(PinotIngestionJobType.SegmentCreation));
+    IngestionJobLauncher.runIngestionJob(spec);
+
+    List<String> generatedSegments = collectSegmentNames(spec);
+    LOGGER.info("New segments: \n{}", 
JsonUtils.objectToString(generatedSegments));
+    return generatedSegments;
+  }
+
+  private List<String> collectSegmentNames(SegmentGenerationJobSpec spec) {
+    URI outputDirURI;
+    try {
+      outputDirURI = new URI(spec.getOutputDirURI());
+      if (outputDirURI.getScheme() == null) {
+        outputDirURI = new File(spec.getOutputDirURI()).toURI();
+      }
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("outputDirURI is not valid - '" + 
spec.getOutputDirURI() + "'");
+    }
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+
+    // Get list of files to process
+    String[] files;
+    try {
+      files = outputDirFS.listFiles(outputDirURI, true);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
+    }
+
+    List<String> generatedSegments = new ArrayList<>();
+    for (String file : files) {
+      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        generatedSegments.add(
+            file.substring(file.lastIndexOf(File.separator) + 1, 
file.indexOf(Constants.TAR_GZ_FILE_EXT)));
+      }
+    }
+    return generatedSegments;
+  }
+
+  private String createSegmentLineageEntry(String tableNameWithType, String 
uploadURL,
+      List<String> segmentsToBackfill, List<String> generatedSegments,
+      AuthProvider authProvider)
+      throws Exception {
+    LOGGER.info("Start replacing segments by creating a lineage entry.");
+    String segmentLineageEntryId = SegmentConversionUtils.startSegmentReplace(
+        tableNameWithType, uploadURL, new 
StartReplaceSegmentsRequest(segmentsToBackfill, generatedSegments),
+        authProvider, false);
+    LOGGER.info("Created segment lineage entry: [{}]", segmentLineageEntryId);
+    return segmentLineageEntryId;
+  }
+
+  private void pushSegmentsAndEndReplace(SegmentGenerationJobSpec spec, String 
tableNameWithType,
+      String uploadURL, String segmentLineageEntryId,
+      AuthProvider authProvider, URI controllerURI)
+      throws Exception {
+    try {
+      spec.setJobType(String.valueOf(PinotIngestionJobType.SegmentTarPush));
+      IngestionJobLauncher.runIngestionJob(spec);
+
+      LOGGER.info("End replacing segments by updating lineage entry: [{}]", 
segmentLineageEntryId);
+      SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, 
segmentLineageEntryId,
+          HttpClient.DEFAULT_SOCKET_TIMEOUT_MS, authProvider);
+      LOGGER.info("New segments are pushed successfully and now serving 
queries.");
+    } catch (Exception e) {
+      LOGGER.error("Failed to upload segments, reverting lineage entry.", e);
+      Map<URI, String> uriToLineageEntryIdMap = 
Collections.singletonMap(controllerURI, segmentLineageEntryId);
+      ConsistentDataPushUtils.handleUploadException(spec, 
uriToLineageEntryIdMap, e);
+      LOGGER.info("Reverted lineage entry [{}]. Any newly uploaded segments 
should have been deleted.",
+          segmentLineageEntryId);
+      throw e;
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "LaunchBackfillIngestionJob";
+  }
+
+  @Override
+  public String description() {
+    return "Launch a backfill ingestion job.";
+  }
+
+  private long getMillis(String backfillDate) {
+    return 
LocalDate.parse(backfillDate).atStartOfDay(ZoneId.of("UTC")).toInstant().toEpochMilli();
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index d4f9788fc5a..a5b9d85d35f 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -44,23 +44,23 @@ public class LaunchDataIngestionJobCommand extends 
AbstractBaseAdminCommand impl
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LaunchDataIngestionJobCommand.class);
   @CommandLine.Option(names = {"-jobSpecFile", "-jobSpec"}, required = true,
       description = "Ingestion job spec file")
-  private String _jobSpecFile;
+  protected String _jobSpecFile;
   @CommandLine.Option(names = {"-values"}, required = false, arity = "1..*",
       description = "Context values set to the job spec template")
-  private List<String> _values;
+  protected List<String> _values;
   @CommandLine.Option(names = {"-propertyFile"}, required = false,
       description = "A property file contains context values to set the job 
spec template")
-  private String _propertyFile;
+  protected String _propertyFile;
   @CommandLine.Option(names = {"-user"}, required = false, description = 
"Username for basic auth.")
-  private String _user;
+  protected String _user;
   @CommandLine.Option(names = {"-password"}, required = false, description = 
"Password for basic auth.")
-  private String _password;
+  protected String _password;
   @CommandLine.Option(names = {"-authToken"}, required = false, description = 
"Http auth token.")
-  private String _authToken;
+  protected String _authToken;
   @CommandLine.Option(names = {"-authTokenUrl"}, required = false, description 
= "Http auth token url.")
-  private String _authTokenUrl;
+  protected String _authTokenUrl;
 
-  private AuthProvider _authProvider;
+  protected AuthProvider _authProvider;
 
   public String getJobSpecFile() {
     return _jobSpecFile;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to