This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6976697 Update some usage of BatchConfig (#7459)
6976697 is described below
commit 6976697aeaa3aa30377f958181eb4f2ba6399fd4
Author: Rong Rong <[email protected]>
AuthorDate: Tue Sep 21 16:07:09 2021 -0700
Update some usage of BatchConfig (#7459)
---
.../resources/PinotIngestionRestletResource.java | 4 +--
.../pinot/controller/util/FileIngestionHelper.java | 31 +++++++++++-----------
.../org/apache/pinot/tools/BootstrapTableTool.java | 8 +++---
3 files changed, 20 insertions(+), 23 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index 7c0d882..f35ca49 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -47,7 +47,6 @@ import
org.apache.pinot.controller.util.FileIngestionHelper.DataPayload;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -193,11 +192,10 @@ public class PinotIngestionRestletResource {
Map<String, String> batchConfigMap =
JsonUtils.stringToObject(batchConfigMapStr, new
TypeReference<Map<String, String>>() {
});
- BatchConfig batchConfig = new BatchConfig(tableNameWithType,
batchConfigMap);
Schema schema =
_pinotHelixResourceManager.getTableSchema(tableNameWithType);
FileIngestionHelper fileIngestionHelper =
- new FileIngestionHelper(tableConfig, schema, batchConfig,
getControllerUri(),
+ new FileIngestionHelper(tableConfig, schema, batchConfigMap,
getControllerUri(),
new File(_controllerConf.getDataDir(), UPLOAD_DIR),
getAuthToken());
return fileIngestionHelper.buildSegmentAndPush(payload);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index dfe5816..4268ea5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -41,7 +41,6 @@ import
org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -69,16 +68,16 @@ public class FileIngestionHelper {
private final TableConfig _tableConfig;
private final Schema _schema;
- private final BatchConfig _batchConfig;
+ private final Map<String, String> _batchConfigMap;
private final URI _controllerUri;
private final File _uploadDir;
private final AuthContext _authContext;
- public FileIngestionHelper(TableConfig tableConfig, Schema schema,
BatchConfig batchConfig, URI controllerUri,
- File uploadDir, String authToken) {
+ public FileIngestionHelper(TableConfig tableConfig, Schema schema,
Map<String, String> batchConfigMap,
+ URI controllerUri, File uploadDir, String authToken) {
_tableConfig = tableConfig;
_schema = schema;
- _batchConfig = batchConfig;
+ _batchConfigMap = batchConfigMap;
_controllerUri = controllerUri;
_uploadDir = uploadDir;
_authContext = new AuthContext(authToken);
@@ -100,15 +99,16 @@ public class FileIngestionHelper {
File outputDir = new File(workingDir, OUTPUT_SEGMENT_DIR);
File segmentTarDir = new File(workingDir, SEGMENT_TAR_DIR);
try {
- Preconditions
- .checkState(inputDir.mkdirs(), "Could not create directory for
downloading input file locally: %s", inputDir);
- Preconditions.checkState(segmentTarDir.mkdirs(), "Could not create
directory for segment tar file: %s", inputDir);
+ Preconditions.checkState(inputDir.mkdirs(),
+ "Could not create directory for downloading input file locally: %s",
inputDir);
+ Preconditions.checkState(segmentTarDir.mkdirs(),
+ "Could not create directory for segment tar file: %s", inputDir);
// Copy file to local working dir
- File inputFile = new File(inputDir,
- String.format("%s.%s", DATA_FILE_PREFIX,
_batchConfig.getInputFormat().toString().toLowerCase()));
+ File inputFile = new File(inputDir, String.format(
+ "%s.%s", DATA_FILE_PREFIX,
_batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI).toLowerCase()));
if (payload._payloadType == PayloadType.URI) {
- copyURIToLocal(_batchConfig, payload._uri, inputFile);
+ copyURIToLocal(_batchConfigMap, payload._uri, inputFile);
LOGGER.info("Copied from URI: {} to local file: {}", payload._uri,
inputFile.getAbsolutePath());
} else {
copyMultipartToLocal(payload._multiPart, inputFile);
@@ -116,7 +116,7 @@ public class FileIngestionHelper {
}
// Update batch config map with values for file upload
- Map<String, String> batchConfigMapOverride = new
HashMap<>(_batchConfig.getBatchConfigMap());
+ Map<String, String> batchConfigMapOverride = new
HashMap<>(_batchConfigMap);
batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI,
inputFile.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI,
outputDir.getAbsolutePath());
batchConfigMapOverride.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
_controllerUri.toString());
@@ -170,12 +170,13 @@ public class FileIngestionHelper {
/**
* Copy the file from given URI to local file
*/
- public static void copyURIToLocal(BatchConfig batchConfig, URI
sourceFileURI, File destFile)
+ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI
sourceFileURI, File destFile)
throws Exception {
String sourceFileURIScheme = sourceFileURI.getScheme();
if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
- PinotFSFactory.register(sourceFileURIScheme,
batchConfig.getInputFsClassName(),
- IngestionConfigUtils.getInputFsProps(batchConfig.getInputFsProps()));
+ PinotFSFactory.register(sourceFileURIScheme,
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
+
IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
+ batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
}
PinotFSFactory.create(sourceFileURIScheme).copyToLocalFile(sourceFileURI,
destFile);
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
index 00eb71f..0534dca 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/BootstrapTableTool.java
@@ -34,7 +34,6 @@ import org.apache.pinot.common.minion.MinionClient;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.util.TlsUtils;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
@@ -128,7 +127,7 @@ public class BootstrapTableTool {
JsonUtils.inputStreamToObject(new
FileInputStream(offlineTableConfigFile), TableConfig.class);
if (tableConfig.getIngestionConfig() != null
&& tableConfig.getIngestionConfig().getBatchIngestionConfig() != null)
{
- updatedTableConfig(tableConfig, tableName, setupTableTmpDir);
+ updatedTableConfig(tableConfig, setupTableTmpDir);
}
LOGGER.info("Adding offline table: {}", tableName);
@@ -190,13 +189,12 @@ public class BootstrapTableTool {
return true;
}
- private void updatedTableConfig(TableConfig tableConfig, String tableName,
File setupTableTmpDir)
+ private void updatedTableConfig(TableConfig tableConfig, File
setupTableTmpDir)
throws Exception {
final List<Map<String, String>> batchConfigsMaps =
tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
for (Map<String, String> batchConfigsMap : batchConfigsMaps) {
- BatchConfig batchConfig = new
BatchConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName),
batchConfigsMap);
- String inputDirURI = batchConfig.getInputDirURI();
+ String inputDirURI =
batchConfigsMap.get(BatchConfigProperties.INPUT_DIR_URI);
if (!new File(inputDirURI).exists()) {
URL resolvedInputDirURI =
BootstrapTableTool.class.getClassLoader().getResource(inputDirURI);
if (resolvedInputDirURI != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]