This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 409e5da SegmentUploader impl (#6740)
409e5da is described below
commit 409e5da344c8ab4077fd318f6a3f801377b1b958
Author: Neha Pawar <[email protected]>
AuthorDate: Thu Apr 8 17:56:44 2021 -0700
SegmentUploader impl (#6740)
A default implementation for the SegmentUploader.
This is a followup to #6718. With this PR, the SegmentWriter and
SegmentUploader will be ready for use by other initiatives.
---
pinot-controller/pom.xml | 5 +
.../pinot/controller/util/FileIngestionHelper.java | 39 ++++-
.../api/PinotIngestionRestletResourceTest.java | 14 +-
.../pinot/controller/helix/ControllerTest.java | 5 +
.../org/apache/pinot/core/util/IngestionUtils.java | 165 +++++++++++++++------
.../apache/pinot/core/util}/SegmentPushUtils.java | 2 +-
.../pinot/core/util}/SegmentPushUtilsTest.java | 2 +-
pinot-distribution/pinot-assembly.xml | 6 +
pinot-integration-tests/pom.xml | 5 +
.../SegmentWriterUploaderIntegrationTest.java | 88 ++++++++---
.../hadoop/HadoopSegmentMetadataPushJobRunner.java | 2 +-
.../hadoop/HadoopSegmentTarPushJobRunner.java | 2 +-
.../hadoop/HadoopSegmentUriPushJobRunner.java | 2 +-
.../spark/SparkSegmentMetadataPushJobRunner.java | 2 +-
.../batch/spark/SparkSegmentTarPushJobRunner.java | 2 +-
.../batch/spark/SparkSegmentUriPushJobRunner.java | 2 +-
.../standalone/SegmentMetadataPushJobRunner.java | 2 +-
.../batch/standalone/SegmentTarPushJobRunner.java | 2 +-
.../batch/standalone/SegmentUriPushJobRunner.java | 2 +-
.../SegmentGenerationAndPushTaskExecutor.java | 3 +-
.../pinot-segment-uploader-default/pom.xml | 53 +++++++
.../segmentuploader/SegmentUploaderDefault.java | 100 +++++++++++++
pinot-plugins/pinot-segment-uploader/pom.xml | 67 +++++++++
.../filebased/FileBasedSegmentWriter.java | 2 +-
pinot-plugins/pom.xml | 1 +
.../org/apache/pinot/spi/auth/AuthContext.java | 34 +++++
.../pinot/spi/ingestion/batch/BatchConfig.java | 52 ++++++-
.../spi/ingestion/batch/BatchConfigProperties.java | 3 +
.../segment/uploader/SegmentUploader.java | 11 +-
.../pinot/spi/utils/IngestionConfigUtils.java | 54 ++++++-
30 files changed, 624 insertions(+), 105 deletions(-)
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 52ede83..6d25fd6 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -73,6 +73,11 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-segment-uploader-default</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
<artifactId>pinot-yammer</artifactId>
<scope>test</scope>
</dependency>
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 169c41a..73615f4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -30,17 +30,23 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.controller.api.resources.SuccessResponse;
import org.apache.pinot.core.util.IngestionUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.auth.AuthContext;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.slf4j.Logger;
@@ -53,6 +59,7 @@ import org.slf4j.LoggerFactory;
*/
public class FileIngestionHelper {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileIngestionHelper.class);
+ private static final String SEGMENT_UPLOADER_CLASS =
"org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault";
private static final String WORKING_DIR_PREFIX = "working_dir";
private static final String INPUT_DATA_DIR = "input_data_dir";
@@ -65,7 +72,7 @@ public class FileIngestionHelper {
private final BatchConfig _batchConfig;
private final URI _controllerUri;
private final File _uploadDir;
- private final String _authToken;
+ private final AuthContext _authContext;
public FileIngestionHelper(TableConfig tableConfig, Schema schema,
BatchConfig batchConfig, URI controllerUri,
File uploadDir, String authToken) {
@@ -74,7 +81,7 @@ public class FileIngestionHelper {
_batchConfig = batchConfig;
_controllerUri = controllerUri;
_uploadDir = uploadDir;
- _authToken = authToken;
+ _authContext = new AuthContext(authToken);
}
/**
@@ -108,14 +115,25 @@ public class FileIngestionHelper {
LOGGER.info("Copied multipart payload to local file: {}",
inputDir.getAbsolutePath());
}
- // Get SegmentGeneratorConfig
+ // Update batch config map with values for file upload
Map<String, String> batchConfigMapOverride = new
HashMap<>(_batchConfig.getBatchConfigMap());
batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI,
inputFile.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI,
outputDir.getAbsolutePath());
+ batchConfigMapOverride.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
_controllerUri.toString());
+ String segmentNamePostfixProp = String.format("%s.%s",
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
+ BatchConfigProperties.SEGMENT_NAME_POSTFIX);
+ if
(StringUtils.isBlank(batchConfigMapOverride.get(segmentNamePostfixProp))) {
+ // Default segmentNameGenerator is SIMPLE.
+ // Adding this suffix to prevent creating a segment with the same name
as an existing segment,
+ // if a file with the same time range is received again
+ batchConfigMapOverride.put(segmentNamePostfixProp,
String.valueOf(System.currentTimeMillis()));
+ }
BatchIngestionConfig batchIngestionConfigOverride =
new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig),
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+ // Get SegmentGeneratorConfig
SegmentGeneratorConfig segmentGeneratorConfig =
IngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _schema,
batchIngestionConfigOverride);
@@ -123,13 +141,20 @@ public class FileIngestionHelper {
String segmentName = IngestionUtils.buildSegment(segmentGeneratorConfig);
LOGGER.info("Built segment: {}", segmentName);
- // Tar and push segment
+ // Tar segment dir
File segmentTarFile =
new File(segmentTarDir, segmentName +
org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT);
TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName),
segmentTarFile);
- IngestionUtils
- .uploadSegment(tableNameWithType,
Lists.newArrayList(segmentTarFile), _controllerUri, _authToken);
- LOGGER.info("Uploaded tar: {} to {}", segmentTarFile.getAbsolutePath(),
_controllerUri);
+
+ // Upload segment
+ IngestionConfig ingestionConfigOverride = new
IngestionConfig(batchIngestionConfigOverride, null, null, null);
+ TableConfig tableConfigOverride =
+ new
TableConfigBuilder(_tableConfig.getTableType()).setTableName(_tableConfig.getTableName())
+ .setIngestionConfig(ingestionConfigOverride).build();
+ SegmentUploader segmentUploader =
PluginManager.get().createInstance(SEGMENT_UPLOADER_CLASS);
+ segmentUploader.init(tableConfigOverride);
+ segmentUploader.uploadSegment(segmentTarFile.toURI(), _authContext);
+ LOGGER.info("Uploaded tar: {} to table: {}",
segmentTarFile.getAbsolutePath(), tableNameWithType);
return new SuccessResponse(
"Successfully ingested file into table: " + tableNameWithType + " as
segment: " + segmentName);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
index a688a00..044347c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceTest.java
@@ -22,11 +22,6 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,19 +30,16 @@ import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.entity.mime.MultipartEntity;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.FileBody;
-import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -98,8 +90,8 @@ public class PinotIngestionRestletResourceTest extends
ControllerTest {
// ingest from file
Map<String, String> batchConfigMap = new HashMap<>();
- batchConfigMap.put("inputFormat", "csv");
- batchConfigMap.put("recordReader.prop.delimiter", "|");
+ batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "csv");
+ batchConfigMap.put(String.format("%s.delimiter",
BatchConfigProperties.RECORD_READER_PROP_PREFIX), "|");
sendHttpPost(_controllerRequestURLBuilder.forIngestFromFile(TABLE_NAME_WITH_TYPE,
batchConfigMap));
segments = _helixResourceManager.getSegmentsFor(TABLE_NAME_WITH_TYPE);
Assert.assertEquals(segments.size(), 1);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 376b75b..478b784 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -531,6 +531,11 @@ public abstract class ControllerTest {
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
}
+ protected void dropAllSegments(String tableName, TableType tableType) throws
IOException {
+ sendDeleteRequest(
+ _controllerRequestURLBuilder.forSegmentDeleteAllAPI(tableName,
tableType.toString()));
+ }
+
protected void reloadOfflineTable(String tableName) throws IOException {
sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName,
TableType.OFFLINE.name()), null);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
index baaac9c..5328cf9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
@@ -19,21 +19,17 @@
package org.apache.pinot.core.util;
import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.common.exception.HttpErrorStatusException;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
-import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -42,6 +38,7 @@ import
org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
import
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.auth.AuthContext;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
@@ -53,16 +50,20 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -70,13 +71,7 @@ import org.slf4j.LoggerFactory;
*/
public final class IngestionUtils {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionUtils.class);
-
- private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
- BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
- private static final long DEFAULT_RETRY_WAIT_MS = 1000L;
- private static final int DEFAULT_ATTEMPTS = 3;
- private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT =
new FileUploadDownloadClient();
+ private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
private IngestionUtils() {
}
@@ -143,9 +138,6 @@ public final class IngestionUtils {
String rawTableName =
TableNameBuilder.extractRawTableName(batchConfig.getTableNameWithType());
String segmentNameGeneratorType =
batchConfig.getSegmentNameGeneratorType();
- if (segmentNameGeneratorType == null) {
- segmentNameGeneratorType = DEFAULT_SEGMENT_NAME_GENERATOR_TYPE;
- }
switch (segmentNameGeneratorType) {
case BatchConfigProperties.SegmentNameGeneratorType.FIXED:
return new FixedSegmentNameGenerator(batchConfig.getSegmentName());
@@ -185,40 +177,117 @@ public final class IngestionUtils {
}
/**
- * Uploads the segment tar files to the provided controller
+ * Uploads the segments from the provided segmentTar URIs to the table,
using push details from the batchConfig
+ * @param tableNameWithType name of the table to upload the segment
+ * @param batchConfig batchConfig with details about push such as
controllerURI, pushAttempts, pushParallelism, etc
+ * @param segmentTarURIs list of URI for the segment tar files
+ * @param authContext auth details required to upload the Pinot segment to
controller
*/
- public static void uploadSegment(String tableNameWithType, List<File>
tarFiles, URI controllerUri,
- final String authToken)
- throws RetriableOperationException, AttemptsExceededException {
- for (File tarFile : tarFiles) {
- String fileName = tarFile.getName();
-
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
- String segmentName = fileName.substring(0, fileName.length() -
Constants.TAR_GZ_FILE_EXT.length());
-
- RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS,
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
- try (InputStream inputStream = new FileInputStream(tarFile)) {
- SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
-
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri),
segmentName, inputStream,
- FileUploadDownloadClient.makeAuthHeader(authToken),
- FileUploadDownloadClient.makeTableParam(tableNameWithType),
- FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
- LOGGER.info("Response for pushing table {} segment {} - {}: {}",
tableNameWithType, segmentName,
- response.getStatusCode(), response.getResponse());
- return true;
- } catch (HttpErrorStatusException e) {
- int statusCode = e.getStatusCode();
- if (statusCode >= 500) {
- LOGGER.warn("Caught temporary exception while pushing table: {}
segment: {}, will retry", tableNameWithType,
- segmentName, e);
- return false;
- } else {
- throw e;
+ public static void uploadSegment(String tableNameWithType, BatchConfig
batchConfig, List<URI> segmentTarURIs,
+ @Nullable AuthContext authContext)
+ throws Exception {
+
+ SegmentGenerationJobSpec segmentUploadSpec =
generateSegmentUploadSpec(tableNameWithType, batchConfig, authContext);
+
+ List<String> segmentTarURIStrs =
segmentTarURIs.stream().map(URI::toString).collect(Collectors.toList());
+ String pushMode = batchConfig.getPushMode();
+ switch
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+ case TAR:
+ try {
+ SegmentPushUtils.pushSegments(segmentUploadSpec, LOCAL_PINOT_FS,
segmentTarURIStrs);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(String
+ .format("Caught exception while uploading segments. Push mode:
TAR, segment tars: [%s]",
+ segmentTarURIStrs), e);
+ }
+ break;
+ case URI:
+ List<String> segmentUris = new ArrayList<>();
+ try {
+ URI outputSegmentDirURI = null;
+ if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+ outputSegmentDirURI =
URI.create(batchConfig.getOutputSegmentDirURI());
+ }
+ for (URI segmentTarURI : segmentTarURIs) {
+ URI updatedURI =
SegmentPushUtils.generateSegmentTarURI(outputSegmentDirURI, segmentTarURI,
+ segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+ segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix());
+ segmentUris.add(updatedURI.toString());
}
+ SegmentPushUtils.sendSegmentUris(segmentUploadSpec, segmentUris);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(String
+ .format("Caught exception while uploading segments. Push mode:
URI, segment URIs: [%s]", segmentUris), e);
}
- });
+ break;
+ case METADATA:
+ try {
+ URI outputSegmentDirURI = null;
+ if (StringUtils.isNotBlank(batchConfig.getOutputSegmentDirURI())) {
+ outputSegmentDirURI =
URI.create(batchConfig.getOutputSegmentDirURI());
+ }
+ PinotFS outputFileFS = getOutputPinotFS(batchConfig,
outputSegmentDirURI);
+ Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+ .getSegmentUriToTarPathMap(outputSegmentDirURI,
segmentUploadSpec.getPushJobSpec().getSegmentUriPrefix(),
+ segmentUploadSpec.getPushJobSpec().getSegmentUriSuffix(),
new String[]{segmentTarURIs.toString()});
+ SegmentPushUtils.sendSegmentUriAndMetadata(segmentUploadSpec,
outputFileFS, segmentUriToTarPathMap);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(String
+ .format("Caught exception while uploading segments. Push mode:
METADATA, segment URIs: [%s]",
+ segmentTarURIStrs), e);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unrecognized push mode - " +
pushMode);
}
}
+ private static SegmentGenerationJobSpec generateSegmentUploadSpec(String
tableName, BatchConfig batchConfig,
+ @Nullable AuthContext authContext) {
+
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(tableName);
+
+ PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+ pinotClusterSpec.setControllerURI(batchConfig.getPushControllerURI());
+ PinotClusterSpec[] pinotClusterSpecs = new
PinotClusterSpec[]{pinotClusterSpec};
+
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setPushAttempts(batchConfig.getPushAttempts());
+ pushJobSpec.setPushParallelism(batchConfig.getPushParallelism());
+
pushJobSpec.setPushRetryIntervalMillis(batchConfig.getPushIntervalRetryMillis());
+ pushJobSpec.setSegmentUriPrefix(batchConfig.getPushSegmentURIPrefix());
+ pushJobSpec.setSegmentUriSuffix(batchConfig.getPushSegmentURISuffix());
+
+ SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+ spec.setPushJobSpec(pushJobSpec);
+ spec.setTableSpec(tableSpec);
+ spec.setPinotClusterSpecs(pinotClusterSpecs);
+ if (authContext != null &&
StringUtils.isNotBlank(authContext.getAuthToken())) {
+ spec.setAuthToken(authContext.getAuthToken());
+ }
+ return spec;
+ }
+
+ /**
+ * Creates an instance of the PinotFS using the fileURI and fs properties
from BatchConfig
+ */
+ public static PinotFS getOutputPinotFS(BatchConfig batchConfig, URI fileURI)
{
+ String fileURIScheme = (fileURI == null) ? null : fileURI.getScheme();
+ if (fileURIScheme == null) {
+ fileURIScheme = PinotFSFactory.LOCAL_PINOT_FS_SCHEME;
+ }
+ if (!PinotFSFactory.isSchemeSupported(fileURIScheme)) {
+ registerPinotFS(fileURIScheme, batchConfig.getOutputFsClassName(),
+
IngestionConfigUtils.getOutputFsProps(batchConfig.getBatchConfigMap()));
+ }
+ return PinotFSFactory.create(fileURIScheme);
+ }
+
+ private static void registerPinotFS(String fileURIScheme, String fsClass,
PinotConfiguration fsProps) {
+ PinotFSFactory.register(fileURIScheme, fsClass, fsProps);
+ }
+
/**
* Extracts all fields required by the {@link
org.apache.pinot.spi.data.readers.RecordExtractor} from the given TableConfig
and Schema
* Fields for ingestion come from 2 places:
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java
similarity index 99%
rename from
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java
index 820f09f..4c254a6 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentPushUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.plugin.ingestion.batch.common;
+package org.apache.pinot.core.util;
import com.google.common.base.Preconditions;
import java.io.File;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java
similarity index 98%
rename from
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java
index ceecc58..19408f4 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/util/SegmentPushUtilsTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pinot.plugin.ingestion.batch.common;
+package org.apache.pinot.core.util;
import java.net.URI;
import org.testng.Assert;
diff --git a/pinot-distribution/pinot-assembly.xml
b/pinot-distribution/pinot-assembly.xml
index 3e1ed79..e117b48 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -140,6 +140,12 @@
<destName>plugins/pinot-segment-writer/pinot-segment-writer-file-based/pinot-segment-writer-file-based-${project.version}-shaded.jar</destName>
</file>
<!-- End Include Pinot Segment Writer Plugins-->
+ <!-- Start Include Pinot Segment Uploader Plugins-->
+ <file>
+
<source>${pinot.root}/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/target/pinot-segment-uploader-default-${project.version}-shaded.jar</source>
+
<destName>plugins/pinot-segment-uploader/pinot-segment-uploader-default/pinot-segment-uploader-default-${project.version}-shaded.jar</destName>
+ </file>
+ <!-- End Include Pinot Segment Uploader Plugins-->
<!-- End Include Pinot Plugins-->
</files>
<fileSets>
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 8949e8b..c17bd5f 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -220,6 +220,11 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-segment-uploader-default</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
<artifactId>pinot-yammer</artifactId>
<version>${project.version}</version>
</dependency>
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
index 883b0a6..1811f72 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
@@ -23,12 +23,14 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault;
import org.apache.pinot.plugin.segmentwriter.filebased.FileBasedSegmentWriter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -37,6 +39,7 @@ import
org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -85,6 +88,7 @@ public class SegmentWriterUploaderIntegrationTest extends
BaseClusterIntegration
Map<String, String> batchConfigMap = new HashMap<>();
batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
_tarDir.getAbsolutePath());
batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
+ batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
_controllerBaseApiUrl);
return new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"),
null,
null, null);
}
@@ -95,7 +99,7 @@ public class SegmentWriterUploaderIntegrationTest extends
BaseClusterIntegration
* Checks the number of segments created and total docs from the query
*/
@Test
- public void testFileBasedSegmentWriter()
+ public void testFileBasedSegmentWriterAndDefaultUploader()
throws Exception {
TableConfig offlineTableConfig = createOfflineTableConfig();
@@ -103,6 +107,8 @@ public class SegmentWriterUploaderIntegrationTest extends
BaseClusterIntegration
SegmentWriter segmentWriter = new FileBasedSegmentWriter();
segmentWriter.init(offlineTableConfig, _schema);
+ SegmentUploader segmentUploader = new SegmentUploaderDefault();
+ segmentUploader.init(offlineTableConfig);
GenericRow reuse = new GenericRow();
long totalDocs = 0;
@@ -110,34 +116,36 @@ public class SegmentWriterUploaderIntegrationTest extends
BaseClusterIntegration
AvroRecordReader avroRecordReader = new AvroRecordReader();
avroRecordReader.init(_avroFiles.get(i), null, null);
+ long numDocsInSegment = 0;
while (avroRecordReader.hasNext()) {
avroRecordReader.next(reuse);
segmentWriter.collect(reuse);
+ numDocsInSegment++;
totalDocs++;
}
- segmentWriter.flush();
+ // flush to segment
+ URI segmentTarURI = segmentWriter.flush();
+ // upload
+ segmentUploader.uploadSegment(segmentTarURI, null);
+
+ // check num segments
+ Assert.assertEquals(getNumSegments(), i + 1);
+ // check numDocs in latest segment
+ Assert.assertEquals(getNumDocsInLatestSegment(), numDocsInSegment);
+ // check totalDocs in query
+ checkTotalDocsInQuery(totalDocs);
}
segmentWriter.close();
- // Manually upload
- // TODO: once an implementation of SegmentUploader is available, use that
instead
- uploadSegments(_tableNameWithType, _tarDir);
+ dropAllSegments(_tableNameWithType, TableType.OFFLINE);
+ checkNumSegments(0);
+ // upload all together using dir
+ segmentUploader.uploadSegmentsFromDir(_tarDir.toURI(), null);
// check num segments
Assert.assertEquals(getNumSegments(), 3);
- final long expectedDocs = totalDocs;
- TestUtils.waitForCondition(new Function<Void, Boolean>() {
- @Nullable
- @Override
- public Boolean apply(@Nullable Void aVoid) {
- try {
- return getTotalDocsFromQuery() == expectedDocs;
- } catch (Exception e) {
- LOGGER.error("Caught exception when getting totalDocs from query:
{}", e.getMessage());
- return null;
- }
- }
- }, 100L, 120_000, "Failed to load " + expectedDocs + " documents", true);
+ // check totalDocs in query
+ checkTotalDocsInQuery(totalDocs);
dropOfflineTable(_tableNameWithType);
}
@@ -156,6 +164,50 @@ public class SegmentWriterUploaderIntegrationTest extends
BaseClusterIntegration
return response.get("resultTable").get("rows").get(0).get(0).asInt();
}
+ private int getNumDocsInLatestSegment()
+ throws IOException {
+ String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.
+ forSegmentListAPIWithTableType(_tableNameWithType,
TableType.OFFLINE.toString()));
+ JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
+ JsonNode segments = array.get(0).get("OFFLINE");
+ String segmentName = segments.get(segments.size() - 1).asText();
+
+ jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.
+ forSegmentMetadata(_tableNameWithType, segmentName));
+ JsonNode metadata = JsonUtils.stringToJsonNode(jsonOutputStr);
+ return metadata.get("segment.total.docs").asInt();
+ }
+
+ private void checkTotalDocsInQuery(long expectedTotalDocs) {
+ TestUtils.waitForCondition(new Function<Void, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ return getTotalDocsFromQuery() == expectedTotalDocs;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception when getting totalDocs from query:
{}", e.getMessage());
+ return null;
+ }
+ }
+ }, 100L, 120_000, "Failed to load " + expectedTotalDocs + " documents",
true);
+ }
+
+ private void checkNumSegments(int expectedNumSegments) {
+ TestUtils.waitForCondition(new Function<Void, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ return getNumSegments() == expectedNumSegments;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception when getting num segments: {}",
e.getMessage());
+ return null;
+ }
+ }
+ }, 100L, 120_000, "Failed to load get num segments", true);
+ }
+
@AfterClass
public void tearDown()
throws Exception {
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
index 65ae7c4..0d9d5ac 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentMetadataPushJobRunner.java
@@ -26,7 +26,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
index 05aa8f9..d29a3f4 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
@@ -25,7 +25,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
index f451c2c..b563d26 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
@@ -26,7 +26,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
index 5b07db4..bb2ca79 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
@@ -26,7 +26,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
index 1757652..b398bee 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
index 824e05a..2926a73 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
index cad8cf6..f2b2e8d 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentMetadataPushJobRunner.java
@@ -24,7 +24,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
index aa1eee4..641fbdf 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
@@ -25,7 +25,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
index 3172dac..d9a0db7 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
@@ -25,7 +25,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index a1d9a38..313a219 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -30,9 +30,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.util.SegmentPushUtils;
import org.apache.pinot.minion.MinionContext;
import
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -118,6 +118,7 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
resultBuilder.setSegmentName(segmentName);
// Segment push task
+ // TODO: Make this use SegmentUploader
pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs,
outputSegmentTarURI);
resultBuilder.setSucceed(true);
} catch (Exception e) {
diff --git
a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml
new file mode 100644
index 0000000..7870427
--- /dev/null
+++
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pinot-segment-uploader</artifactId>
+ <groupId>org.apache.pinot</groupId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pinot-segment-uploader-default</artifactId>
+ <name>Pinot Segment Uploader Default</name>
+ <url>https://pinot.apache.org/</url>
+ <properties>
+ <pinot.root>${basedir}/../../..</pinot.root>
+ <phase.prop>package</phase.prop>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
new file mode 100644
index 0000000..5c71575
--- /dev/null
+++
b/pinot-plugins/pinot-segment-uploader/pinot-segment-uploader-default/src/main/java/org/apache/pinot/plugin/segmentuploader/SegmentUploaderDefault.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.segmentuploader;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.core.util.IngestionUtils;
+import org.apache.pinot.spi.auth.AuthContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Default implementation of {@link SegmentUploader} with support for all push
modes
+ * The configs for push are fetched from batchConfigMaps of tableConfig
+ */
+public class SegmentUploaderDefault implements SegmentUploader {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentUploaderDefault.class);
+
+ private String _tableNameWithType;
+ private BatchConfig _batchConfig;
+
+ @Override
+ public void init(TableConfig tableConfig)
+ throws Exception {
+ _tableNameWithType = tableConfig.getTableName();
+
+ Preconditions.checkState(
+ tableConfig.getIngestionConfig() != null &&
tableConfig.getIngestionConfig().getBatchIngestionConfig() != null
+ && CollectionUtils
+
.isNotEmpty(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()),
+ "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps
in tableConfig for table: %s",
+ _tableNameWithType);
+ Preconditions
+
.checkState(tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().size()
== 1,
+ "batchConfigMaps must contain only 1 BatchConfig for table: %s",
_tableNameWithType);
+ _batchConfig = new BatchConfig(_tableNameWithType,
+
tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps().get(0));
+
+
Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getPushControllerURI()),
+ "Must provide: %s in batchConfigs for table: %s",
BatchConfigProperties.PUSH_CONTROLLER_URI,
+ _tableNameWithType);
+
+ LOGGER.info("Initialized {} for table: {}",
SegmentUploaderDefault.class.getName(), _tableNameWithType);
+ }
+
+ @Override
+ public void uploadSegment(URI segmentTarURI, @Nullable AuthContext
authContext)
+ throws Exception {
+ IngestionUtils
+ .uploadSegment(_tableNameWithType, _batchConfig,
Collections.singletonList(segmentTarURI), authContext);
+ LOGGER.info("Successfully uploaded segment: {} to table: {}",
segmentTarURI, _tableNameWithType);
+ }
+
+ @Override
+ public void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext
authContext)
+ throws Exception {
+
+ List<URI> segmentTarURIs = new ArrayList<>();
+ PinotFS outputPinotFS = IngestionUtils.getOutputPinotFS(_batchConfig,
segmentDir);
+ String[] filePaths = outputPinotFS.listFiles(segmentDir, true);
+ for (String filePath : filePaths) {
+ URI uri = URI.create(filePath);
+ if (!outputPinotFS.isDirectory(uri) &&
filePath.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+ segmentTarURIs.add(uri);
+ }
+ }
+ IngestionUtils.uploadSegment(_tableNameWithType, _batchConfig,
segmentTarURIs, authContext);
+ LOGGER.info("Successfully uploaded segments: {} to table: {}",
segmentTarURIs, _tableNameWithType);
+ }
+}
diff --git a/pinot-plugins/pinot-segment-uploader/pom.xml
b/pinot-plugins/pinot-segment-uploader/pom.xml
new file mode 100644
index 0000000..e5e05fb
--- /dev/null
+++ b/pinot-plugins/pinot-segment-uploader/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pinot-plugins</artifactId>
+ <groupId>org.apache.pinot</groupId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <artifactId>pinot-segment-uploader</artifactId>
+ <packaging>pom</packaging>
+ <name>Pinot Segment Uploader</name>
+ <url>https://pinot.apache.org/</url>
+ <properties>
+ <pinot.root>${basedir}/../..</pinot.root>
+ <plugin.type>pinot-segment-uploader</plugin.type>
+ </properties>
+
+ <modules>
+ <module>pinot-segment-uploader-default</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-spi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
index abae7c2..e1c090c 100644
---
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
+++
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
@@ -145,7 +145,7 @@ public class FileBasedSegmentWriter implements
SegmentWriter {
* Successful completion of segment will return the segment URI.
* The buffer will be reset and ready to accept further records via
<code>collect()</code>
*
- * If an exception is throw, the buffer will not be reset
+ * If an exception is thrown, the buffer will not be reset
* and so, <code>flush()</code> can be invoked repeatedly in a retry loop.
* If a successful invocation is not achieved,<code>close()</code> followed
by <code>init</code> will have to be
* called in order to reset the buffer and resume record writing.
diff --git a/pinot-plugins/pom.xml b/pinot-plugins/pom.xml
index bd5efbc..238e02b 100644
--- a/pinot-plugins/pom.xml
+++ b/pinot-plugins/pom.xml
@@ -47,6 +47,7 @@
<module>pinot-minion-tasks</module>
<module>pinot-metrics</module>
<module>pinot-segment-writer</module>
+ <module>pinot-segment-uploader</module>
</modules>
<dependencies>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
new file mode 100644
index 0000000..5a9798c
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/auth/AuthContext.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.auth;
+
+/**
+ * Container for all auth related info
+ */
+public class AuthContext {
+ private final String _authToken;
+
+ public AuthContext(String authToken) {
+ _authToken = authToken;
+ }
+
+ public String getAuthToken() {
+ return _authToken;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
index aed5eec..4495c85 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java
@@ -52,6 +52,15 @@ public class BatchConfig {
private final boolean _excludeSequenceId;
private final String _sequenceId;
+ private final String _pushMode;
+ private final int _pushAttempts;
+ private final int _pushParallelism;
+ private final long _pushIntervalRetryMillis;
+ private final String _pushSegmentURIPrefix;
+ private final String _pushSegmentURISuffix;
+ private final String _pushControllerURI;
+ private final String _outputSegmentDirURI;
+
public BatchConfig(String tableNameWithType, Map<String, String>
batchConfigsMap) {
_batchConfigMap = batchConfigsMap;
_tableNameWithType = tableNameWithType;
@@ -78,7 +87,7 @@ public class BatchConfig {
_recordReaderProps = IngestionConfigUtils
.extractPropsMatchingPrefix(batchConfigsMap,
BatchConfigProperties.RECORD_READER_PROP_PREFIX);
- _segmentNameGeneratorType =
batchConfigsMap.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE);
+ _segmentNameGeneratorType =
IngestionConfigUtils.getSegmentNameGeneratorType(batchConfigsMap);
_segmentNameGeneratorConfigs = IngestionConfigUtils
.extractPropsMatchingPrefix(batchConfigsMap,
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX);
Map<String, String> segmentNameGeneratorProps =
IngestionConfigUtils.getSegmentNameGeneratorProps(batchConfigsMap);
@@ -87,6 +96,15 @@ public class BatchConfig {
_segmentNamePostfix =
segmentNameGeneratorProps.get(BatchConfigProperties.SEGMENT_NAME_POSTFIX);
_excludeSequenceId =
Boolean.parseBoolean(segmentNameGeneratorProps.get(BatchConfigProperties.EXCLUDE_SEQUENCE_ID));
_sequenceId = batchConfigsMap.get(BatchConfigProperties.SEQUENCE_ID);
+
+ _pushMode = IngestionConfigUtils.getPushMode(batchConfigsMap);
+ _pushAttempts = IngestionConfigUtils.getPushAttempts(batchConfigsMap);
+ _pushParallelism =
IngestionConfigUtils.getPushParallelism(batchConfigsMap);
+ _pushIntervalRetryMillis =
IngestionConfigUtils.getPushRetryIntervalMillis(batchConfigsMap);
+ _pushSegmentURIPrefix =
batchConfigsMap.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX);
+ _pushSegmentURISuffix =
batchConfigsMap.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX);
+ _pushControllerURI =
batchConfigsMap.get(BatchConfigProperties.PUSH_CONTROLLER_URI);
+ _outputSegmentDirURI =
batchConfigsMap.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI);
}
public String getTableNameWithType() {
@@ -165,6 +183,38 @@ public class BatchConfig {
return _sequenceId;
}
+ public String getPushMode() {
+ return _pushMode;
+ }
+
+ public int getPushAttempts() {
+ return _pushAttempts;
+ }
+
+ public int getPushParallelism() {
+ return _pushParallelism;
+ }
+
+ public long getPushIntervalRetryMillis() {
+ return _pushIntervalRetryMillis;
+ }
+
+ public String getPushSegmentURIPrefix() {
+ return _pushSegmentURIPrefix;
+ }
+
+ public String getPushSegmentURISuffix() {
+ return _pushSegmentURISuffix;
+ }
+
+ public String getPushControllerURI() {
+ return _pushControllerURI;
+ }
+
+ public String getOutputSegmentDirURI() {
+ return _outputSegmentDirURI;
+ }
+
public Map<String, String> getBatchConfigMap() {
return _batchConfigMap;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index b208814..a82553f 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -50,6 +50,9 @@ public class BatchConfigProperties {
public static final String OVERWRITE_OUTPUT = "overwriteOutput";
public static final String INPUT_DATA_FILE_URI_KEY = "input.data.file.uri";
public static final String PUSH_MODE = "push.mode";
+ public static final String PUSH_ATTEMPTS = "push.attempts";
+ public static final String PUSH_PARALLELISM = "push.parallelism";
+ public static final String PUSH_RETRY_INTERVAL_MILLIS =
"push.retry.interval.millis";
public static final String PUSH_CONTROLLER_URI = "push.controllerUri";
public static final String PUSH_SEGMENT_URI_PREFIX = "push.segmentUriPrefix";
public static final String PUSH_SEGMENT_URI_SUFFIX = "push.segmentUriSuffix";
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
index 64d6cfa..995b597 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/uploader/SegmentUploader.java
@@ -19,7 +19,9 @@
package org.apache.pinot.spi.ingestion.segment.uploader;
import java.net.URI;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceStability;
+import org.apache.pinot.spi.auth.AuthContext;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -39,14 +41,17 @@ public interface SegmentUploader {
/**
* Uploads the segment tar file to the cluster
* @param segmentTarFile URI of segment tar file
+ * @param authContext auth details required to upload pinot segment to
controller
*/
- void uploadSegment(URI segmentTarFile)
+ void uploadSegment(URI segmentTarFile, @Nullable AuthContext authContext)
throws Exception;
/**
- * Uploads the segments from the segmentDir to the cluster
+ * Uploads the segments from the segmentDir to the cluster.
+ * Looks for segmentTar files recursively, with suffix .tar.gz
* @param segmentDir URI of directory containing segment tar files
+ * @param authContext auth details required to upload pinot segment to
controller
*/
- void uploadSegments(URI segmentDir)
+ void uploadSegmentsFromDir(URI segmentDir, @Nullable AuthContext authContext)
throws Exception;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index a1d7530..74f68a9 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
@@ -34,7 +35,12 @@ import
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
*/
public final class IngestionConfigUtils {
public static final String DOT_SEPARATOR = ".";
+ private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
+ BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
private static final String DEFAULT_PUSH_MODE = "tar";
+ private static final int DEFAULT_PUSH_ATTEMPTS = 5;
+ private static final int DEFAULT_PUSH_PARALLELISM = 1;
+ private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
/**
* Fetches the streamConfig from the given realtime table.
@@ -158,11 +164,51 @@ public final class IngestionConfigUtils {
return props;
}
+ /**
+ * Extracts the segment name generator type from the batchConfigMap, or
returns default value if not found
+ */
+ public static String getSegmentNameGeneratorType(Map<String, String>
batchConfigMap) {
+ return batchConfigMap
+ .getOrDefault(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
DEFAULT_SEGMENT_NAME_GENERATOR_TYPE);
+ }
+
+ /**
+ * Extracts the push mode from the batchConfigMap, or returns default value
if not found
+ */
public static String getPushMode(Map<String, String> batchConfigMap) {
- String pushMode = batchConfigMap.get(BatchConfigProperties.PUSH_MODE);
- if (pushMode == null) {
- pushMode = DEFAULT_PUSH_MODE;
+ return batchConfigMap.getOrDefault(BatchConfigProperties.PUSH_MODE,
DEFAULT_PUSH_MODE);
+ }
+
+ /**
+ * Extracts the push attempts from the batchConfigMap, or returns default
value if not found
+ */
+ public static int getPushAttempts(Map<String, String> batchConfigMap) {
+ String pushAttempts =
batchConfigMap.get(BatchConfigProperties.PUSH_ATTEMPTS);
+ if (StringUtils.isNumeric(pushAttempts)) {
+ return Integer.parseInt(pushAttempts);
+ }
+ return DEFAULT_PUSH_ATTEMPTS;
+ }
+
+ /**
+ * Extracts the push parallelism from the batchConfigMap, or returns default
value if not found
+ */
+ public static int getPushParallelism(Map<String, String> batchConfigMap) {
+ String pushParallelism =
batchConfigMap.get(BatchConfigProperties.PUSH_PARALLELISM);
+ if (StringUtils.isNumeric(pushParallelism)) {
+ return Integer.parseInt(pushParallelism);
+ }
+ return DEFAULT_PUSH_PARALLELISM;
+ }
+
+ /**
+ * Extracts the push return interval millis from the batchConfigMap, or
returns default value if not found
+ */
+ public static long getPushRetryIntervalMillis(Map<String, String>
batchConfigMap) {
+ String pushRetryIntervalMillis =
batchConfigMap.get(BatchConfigProperties.PUSH_RETRY_INTERVAL_MILLIS);
+ if (StringUtils.isNumeric(pushRetryIntervalMillis)) {
+ return Long.parseLong(pushRetryIntervalMillis);
}
- return pushMode;
+ return DEFAULT_PUSH_RETRY_INTERVAL_MILLIS;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]