icefury71 commented on a change in pull request #6718:
URL: https://github.com/apache/incubator-pinot/pull/6718#discussion_r603776066
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -18,25 +18,210 @@
*/
package org.apache.pinot.core.util;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Utility methods for extracting source and destination fields from ingestion
configs
+ * Helper methods for ingestion
*/
-public class IngestionUtils {
+public final class IngestionUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionUtils.class);
+
+ private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
+ BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
+ private static final long DEFAULT_RETRY_WAIT_MS = 1000L;
+ private static final int DEFAULT_ATTEMPTS = 3;
+ private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT =
new FileUploadDownloadClient();
+
+ private IngestionUtils() {
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig and schema.
+ * All properties are taken from the 1st Map in tableConfig ->
ingestionConfig -> batchIngestionConfig -> batchConfigMaps
+ * @param tableConfig tableConfig with the batchConfigMap set
+ * @param schema pinot schema
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema)
+ throws IOException, ClassNotFoundException {
+ Preconditions.checkNotNull(tableConfig.getIngestionConfig(),
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ return generateSegmentGeneratorConfig(tableConfig, schema,
+ tableConfig.getIngestionConfig().getBatchIngestionConfig());
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig, schema and
batchIngestionConfig.
+ * The provided batchIngestionConfig will take precedence over the one in
tableConfig
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema,
+ BatchIngestionConfig batchIngestionConfig)
+ throws ClassNotFoundException, IOException {
+ Preconditions.checkNotNull(batchIngestionConfig,
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+
Preconditions.checkState(CollectionUtils.isNotEmpty(batchIngestionConfig.getBatchConfigMaps()),
+ "Must provide batchConfigMap in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ BatchConfig batchConfig =
+ new BatchConfig(tableConfig.getTableName(),
batchIngestionConfig.getBatchConfigMaps().get(0));
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
+
+ // Input/output configs
+ segmentGeneratorConfig.setInputFilePath(batchConfig.getInputDirURI());
+ segmentGeneratorConfig.setOutDir(batchConfig.getOutputDirURI());
+
+ // Reader configs
+ segmentGeneratorConfig
+
.setRecordReaderPath(RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString()));
+ Map<String, String> recordReaderProps = batchConfig.getRecordReaderProps();
+
segmentGeneratorConfig.setReaderConfig(RecordReaderFactory.getRecordReaderConfig(batchConfig.getInputFormat(),
+ IngestionConfigUtils.getRecordReaderProps(recordReaderProps)));
+
+ // Segment name generator configs
+ SegmentNameGenerator segmentNameGenerator =
+ getSegmentNameGenerator(batchConfig,
batchIngestionConfig.getSegmentIngestionType(),
+ batchIngestionConfig.getSegmentIngestionFrequency(), tableConfig,
schema);
+ segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+ String sequenceId = batchConfig.getSequenceId();
+ if (StringUtils.isNumeric(sequenceId)) {
+ segmentGeneratorConfig.setSequenceId(Integer.parseInt(sequenceId));
+ }
+
+ return segmentGeneratorConfig;
+ }
+
+ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig
batchConfig, String pushType,
+ String pushFrequency, TableConfig tableConfig, Schema schema) {
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(batchConfig.getTableNameWithType());
+ String segmentNameGeneratorType =
batchConfig.getSegmentNameGeneratorType();
+ if (segmentNameGeneratorType == null) {
+ segmentNameGeneratorType = DEFAULT_SEGMENT_NAME_GENERATOR_TYPE;
+ }
+ switch (segmentNameGeneratorType) {
+ case BatchConfigProperties.SegmentNameGeneratorType.FIXED:
+ return new FixedSegmentNameGenerator(batchConfig.getSegmentName());
+
+ case BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE:
+ DateTimeFormatSpec dateTimeFormatSpec = null;
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumnName != null) {
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ if (dateTimeFieldSpec != null) {
+ dateTimeFormatSpec = new
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+ }
+ }
+ return new NormalizedDateSegmentNameGenerator(rawTableName,
batchConfig.getSegmentNamePrefix(),
+ batchConfig.isExcludeSequenceId(), pushType, pushFrequency,
dateTimeFormatSpec);
+
+ case BatchConfigProperties.SegmentNameGeneratorType.SIMPLE:
+ return new SimpleSegmentNameGenerator(rawTableName,
batchConfig.getSegmentNamePostfix());
+
+ default:
+ throw new IllegalStateException(String
+ .format("Unsupported segmentNameGeneratorType: %s for table: %s",
segmentNameGeneratorType,
+ tableConfig.getTableName()));
+ }
+ }
+
+ /**
+ * Builds a segment using given {@link SegmentGeneratorConfig}
+ * @return segment name
+ */
+ public static String buildSegment(SegmentGeneratorConfig
segmentGeneratorConfig)
+ throws Exception {
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ return driver.getSegmentName();
+ }
+
+ /**
+ * Uploads the segment tar files to the provided controller
+ */
+ public static void uploadSegment(String tableNameWithType, List<File>
tarFiles, URI controllerUri, String authToken)
Review comment:
nit: authToken should probably be final
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -18,25 +18,210 @@
*/
package org.apache.pinot.core.util;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Utility methods for extracting source and destination fields from ingestion
configs
+ * Helper methods for ingestion
*/
-public class IngestionUtils {
+public final class IngestionUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionUtils.class);
+
+ private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
+ BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
+ private static final long DEFAULT_RETRY_WAIT_MS = 1000L;
+ private static final int DEFAULT_ATTEMPTS = 3;
+ private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT =
new FileUploadDownloadClient();
+
+ private IngestionUtils() {
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig and schema.
+ * All properties are taken from the 1st Map in tableConfig ->
ingestionConfig -> batchIngestionConfig -> batchConfigMaps
+ * @param tableConfig tableConfig with the batchConfigMap set
+ * @param schema pinot schema
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema)
+ throws IOException, ClassNotFoundException {
+ Preconditions.checkNotNull(tableConfig.getIngestionConfig(),
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ return generateSegmentGeneratorConfig(tableConfig, schema,
+ tableConfig.getIngestionConfig().getBatchIngestionConfig());
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig, schema and
batchIngestionConfig.
+ * The provided batchIngestionConfig will take precedence over the one in
tableConfig
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema,
+ BatchIngestionConfig batchIngestionConfig)
+ throws ClassNotFoundException, IOException {
+ Preconditions.checkNotNull(batchIngestionConfig,
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+
Preconditions.checkState(CollectionUtils.isNotEmpty(batchIngestionConfig.getBatchConfigMaps()),
+ "Must provide batchConfigMap in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ BatchConfig batchConfig =
+ new BatchConfig(tableConfig.getTableName(),
batchIngestionConfig.getBatchConfigMaps().get(0));
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
+
+ // Input/output configs
+ segmentGeneratorConfig.setInputFilePath(batchConfig.getInputDirURI());
+ segmentGeneratorConfig.setOutDir(batchConfig.getOutputDirURI());
+
+ // Reader configs
+ segmentGeneratorConfig
+
.setRecordReaderPath(RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString()));
+ Map<String, String> recordReaderProps = batchConfig.getRecordReaderProps();
+
segmentGeneratorConfig.setReaderConfig(RecordReaderFactory.getRecordReaderConfig(batchConfig.getInputFormat(),
+ IngestionConfigUtils.getRecordReaderProps(recordReaderProps)));
+
+ // Segment name generator configs
+ SegmentNameGenerator segmentNameGenerator =
+ getSegmentNameGenerator(batchConfig,
batchIngestionConfig.getSegmentIngestionType(),
+ batchIngestionConfig.getSegmentIngestionFrequency(), tableConfig,
schema);
+ segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+ String sequenceId = batchConfig.getSequenceId();
+ if (StringUtils.isNumeric(sequenceId)) {
+ segmentGeneratorConfig.setSequenceId(Integer.parseInt(sequenceId));
+ }
+
+ return segmentGeneratorConfig;
+ }
+
+ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig
batchConfig, String pushType,
+ String pushFrequency, TableConfig tableConfig, Schema schema) {
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(batchConfig.getTableNameWithType());
+ String segmentNameGeneratorType =
batchConfig.getSegmentNameGeneratorType();
+ if (segmentNameGeneratorType == null) {
+ segmentNameGeneratorType = DEFAULT_SEGMENT_NAME_GENERATOR_TYPE;
+ }
+ switch (segmentNameGeneratorType) {
+ case BatchConfigProperties.SegmentNameGeneratorType.FIXED:
+ return new FixedSegmentNameGenerator(batchConfig.getSegmentName());
+
+ case BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE:
+ DateTimeFormatSpec dateTimeFormatSpec = null;
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumnName != null) {
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ if (dateTimeFieldSpec != null) {
+ dateTimeFormatSpec = new
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+ }
+ }
+ return new NormalizedDateSegmentNameGenerator(rawTableName,
batchConfig.getSegmentNamePrefix(),
+ batchConfig.isExcludeSequenceId(), pushType, pushFrequency,
dateTimeFormatSpec);
+
+ case BatchConfigProperties.SegmentNameGeneratorType.SIMPLE:
+ return new SimpleSegmentNameGenerator(rawTableName,
batchConfig.getSegmentNamePostfix());
+
+ default:
+ throw new IllegalStateException(String
+ .format("Unsupported segmentNameGeneratorType: %s for table: %s",
segmentNameGeneratorType,
+ tableConfig.getTableName()));
+ }
+ }
+
+ /**
+ * Builds a segment using given {@link SegmentGeneratorConfig}
+ * @return segment name
+ */
+ public static String buildSegment(SegmentGeneratorConfig
segmentGeneratorConfig)
+ throws Exception {
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ return driver.getSegmentName();
+ }
+
+ /**
+ * Uploads the segment tar files to the provided controller
+ */
+ public static void uploadSegment(String tableNameWithType, List<File>
tarFiles, URI controllerUri, String authToken)
+ throws RetriableOperationException, AttemptsExceededException {
+ for (File tarFile : tarFiles) {
+ String fileName = tarFile.getName();
+ Preconditions
+
.checkArgument(fileName.endsWith(org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT));
Review comment:
nit: use static import for readability ?
##########
File path:
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import org.apache.pinot.plugin.segmentwriter.filebased.FileBasedSegmentWriter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests creating segments via the {@link SegmentWriter} implementations
+ */
+public class SegmentWriterUploaderIntegrationTest extends
BaseClusterIntegrationTest {
+
+ private Schema _schema;
+ private String _tableNameWithType;
+ private List<File> _avroFiles;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ // Create and upload the schema
+ _schema = createSchema();
+ addSchema(_schema);
+ _tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(getTableName());
+
+ // Get avro files
+ _avroFiles = getAllAvroFiles();
+ }
+
+ @Nullable
+ protected IngestionConfig getIngestionConfig() {
+ Map<String, String> batchConfigMap = new HashMap<>();
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
_tarDir.getAbsolutePath());
+ batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
+ return new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"),
null,
+ null, null);
+ }
+
+ /**
+ * Write the records from 3 avro files into the Pinot table using the {@link
FileBasedSegmentWriter}
+ * Calls {@link SegmentWriter#flush()} after writing records from each avro
file
+ * Checks the number of segments created and total docs from the query
+ */
+ @Test
+ public void testFileBasedSegmentWriter()
+ throws Exception {
+
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+
+ SegmentWriter segmentWriter = new FileBasedSegmentWriter();
+ segmentWriter.init(offlineTableConfig, _schema);
+
+ GenericRow reuse = new GenericRow();
+ long totalDocs = 0;
+ for (int i = 0; i < 3; i++) {
+ AvroRecordReader avroRecordReader = new AvroRecordReader();
+ avroRecordReader.init(_avroFiles.get(i), null, null);
+
+ while (avroRecordReader.hasNext()) {
+ avroRecordReader.next(reuse);
+ segmentWriter.collect(reuse);
+ totalDocs++;
+ }
+ segmentWriter.flush();
+ }
+ segmentWriter.close();
+
+ // Manually upload
+ // TODO: once an implementation of SegmentUploader is available, use that
instead
+ uploadSegments(_tableNameWithType, _tarDir);
+
+ // check num segments
+ Assert.assertEquals(getNumSegments(), 3);
+ final long expectedDocs = totalDocs;
+ TestUtils.waitForCondition(new Function<Void, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ return getTotalDocsFromQuery() == expectedDocs;
+ } catch (Exception e) {
+ return null;
Review comment:
Might be good to log something here (at the risk of being too noisy)
##########
File path:
pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
##########
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.segmentwriter.filebased;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
+import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.util.IngestionUtils;
+import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link SegmentWriter} implementation that uses a local file as a buffer
to collect {@link GenericRow}.
+ * The {@link GenericRow} are written to the buffer as AVRO records.
+ */
+@NotThreadSafe
+public class FileBasedSegmentWriter implements SegmentWriter {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileBasedSegmentWriter.class);
+ private static final FileFormat BUFFER_FILE_FORMAT = FileFormat.AVRO;
+
+ private TableConfig _tableConfig;
+ private String _tableNameWithType;
+ private BatchIngestionConfig _batchIngestionConfig;
+ private BatchConfig _batchConfig;
+ private String _outputDirURI;
+ private Schema _schema;
+ private Set<String> _fieldsToRead;
+ private RecordTransformer _recordTransformer;
+
+ private File _stagingDir;
+ private File _bufferFile;
+
+ private org.apache.avro.Schema _avroSchema;
+ private DataFileWriter<GenericData.Record> _recordWriter;
+ private GenericData.Record _reusableRecord;
+
+ @Override
+ public void init(TableConfig tableConfig, Schema schema)
+ throws Exception {
+ _tableConfig = tableConfig;
+ _tableNameWithType = _tableConfig.getTableName();
+
+ Preconditions.checkState(
+ _tableConfig.getIngestionConfig() != null &&
_tableConfig.getIngestionConfig().getBatchIngestionConfig() != null
+ && CollectionUtils
+
.isNotEmpty(_tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()),
+ "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps
in tableConfig for table: %s",
+ _tableNameWithType);
+ _batchIngestionConfig =
_tableConfig.getIngestionConfig().getBatchIngestionConfig();
+ Preconditions.checkState(_batchIngestionConfig.getBatchConfigMaps().size()
== 1,
+ "batchConfigMaps must contain only 1 BatchConfig for table: %s",
_tableNameWithType);
+ _batchConfig = new BatchConfig(_tableNameWithType,
_batchIngestionConfig.getBatchConfigMaps().get(0));
+
+
Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getOutputDirURI()),
+ "Must provide: %s in batchConfigs for table: %s",
BatchConfigProperties.OUTPUT_DIR_URI, _tableNameWithType);
+ _outputDirURI = _batchConfig.getOutputDirURI();
+ Files.createDirectories(Paths.get(_outputDirURI));
+
+ _schema = schema;
+ _fieldsToRead = _schema.getColumnNames();
+ _recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
+ _avroSchema =
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
+ _reusableRecord = new GenericData.Record(_avroSchema);
+
+ // Create tmp dir
+ _stagingDir = new File(FileUtils.getTempDirectory(),
+ String.format("segment_writer_staging_%s_%d", _tableNameWithType,
System.currentTimeMillis()));
+ Preconditions.checkState(_stagingDir.mkdirs(), "Failed to create staging
dir: %s", _stagingDir.getAbsolutePath());
+
+ // Create buffer file
+ File bufferDir = new File(_stagingDir, "buffer_dir");
+ Preconditions.checkState(bufferDir.mkdirs(), "Failed to create buffer_dir:
%s", bufferDir.getAbsolutePath());
+ _bufferFile = new File(bufferDir, "buffer_file");
+ resetBuffer();
+ LOGGER.info("Initialized {} for table: {}",
FileBasedSegmentWriter.class.getName(), _tableNameWithType);
+ }
+
+ private void resetBuffer()
+ throws IOException {
+ FileUtils.deleteQuietly(_bufferFile);
+ _recordWriter = new DataFileWriter<>(new
GenericDatumWriter<>(_avroSchema));
+ _recordWriter.create(_avroSchema, _bufferFile);
+ }
+
+ @Override
+ public void collect(GenericRow row)
+ throws IOException {
+ GenericRow transform = _recordTransformer.transform(row);
+ SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform,
_reusableRecord, _fieldsToRead);
+ _recordWriter.append(_reusableRecord);
+ }
+
+ @Override
+ public URI flush()
+ throws IOException {
+
+ LOGGER.info("Beginning flush for table: {}", _tableNameWithType);
+ _recordWriter.close();
+
+ // Create temp dir for flush
+ File flushDir = new File(_stagingDir, "flush_dir_" +
System.currentTimeMillis());
+ Preconditions.checkState(flushDir.mkdirs(), "Failed to create flush dir:
%s", flushDir);
+
+ try {
+ // Segment dir
+ File segmentDir = new File(flushDir, "segment_dir");
+
+ // Make BatchIngestionConfig for flush
+ Map<String, String> batchConfigMapOverride = new
HashMap<>(_batchConfig.getBatchConfigMap());
+ batchConfigMapOverride.put(BatchConfigProperties.INPUT_DIR_URI,
_bufferFile.getAbsolutePath());
+ batchConfigMapOverride.put(BatchConfigProperties.OUTPUT_DIR_URI,
segmentDir.getAbsolutePath());
+ batchConfigMapOverride.put(BatchConfigProperties.INPUT_FORMAT,
BUFFER_FILE_FORMAT.toString());
+ BatchIngestionConfig batchIngestionConfig = new
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
+ _batchIngestionConfig.getSegmentIngestionType(),
_batchIngestionConfig.getSegmentIngestionFrequency());
+
+ // Build segment
+ SegmentGeneratorConfig segmentGeneratorConfig =
+ IngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _schema,
batchIngestionConfig);
+ String segmentName = IngestionUtils.buildSegment(segmentGeneratorConfig);
+ LOGGER.info("Successfully built segment: {} for table: {}", segmentName,
_tableNameWithType);
+
+ // Tar segment
+ File segmentTarFile = new File(_outputDirURI, segmentName +
Constants.TAR_GZ_FILE_EXT);
+ if (!_batchConfig.isOverwriteOutput() && segmentTarFile.exists()) {
+ segmentTarFile = new File(_outputDirURI,
+ String.format("%s_%d%s", segmentName, System.currentTimeMillis(),
Constants.TAR_GZ_FILE_EXT));
+ }
+ TarGzCompressionUtils.createTarGzFile(new File(segmentDir, segmentName),
segmentTarFile);
+ LOGGER.info("Created segment tar: {} for segment: {} of table: {}",
segmentTarFile.getAbsolutePath(), segmentName,
+ _tableNameWithType);
+
+ // Reset buffer and return segmentTar URI
+ resetBuffer();
Review comment:
Should `resetBuffer` be in finally ? Given that we've already close
_recordWriter ?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -18,25 +18,210 @@
*/
package org.apache.pinot.core.util;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Utility methods for extracting source and destination fields from ingestion
configs
+ * Helper methods for ingestion
*/
-public class IngestionUtils {
+public final class IngestionUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionUtils.class);
+
+ private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
+ BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
+ private static final long DEFAULT_RETRY_WAIT_MS = 1000L;
+ private static final int DEFAULT_ATTEMPTS = 3;
+ private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT =
new FileUploadDownloadClient();
+
+ private IngestionUtils() {
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig and schema.
+ * All properties are taken from the 1st Map in tableConfig ->
ingestionConfig -> batchIngestionConfig -> batchConfigMaps
+ * @param tableConfig tableConfig with the batchConfigMap set
+ * @param schema pinot schema
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema)
+ throws IOException, ClassNotFoundException {
+ Preconditions.checkNotNull(tableConfig.getIngestionConfig(),
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ return generateSegmentGeneratorConfig(tableConfig, schema,
+ tableConfig.getIngestionConfig().getBatchIngestionConfig());
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig, schema and
batchIngestionConfig.
+ * The provided batchIngestionConfig will take precedence over the one in
tableConfig
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema,
+ BatchIngestionConfig batchIngestionConfig)
+ throws ClassNotFoundException, IOException {
+ Preconditions.checkNotNull(batchIngestionConfig,
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+
Preconditions.checkState(CollectionUtils.isNotEmpty(batchIngestionConfig.getBatchConfigMaps()),
+ "Must provide batchConfigMap in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ BatchConfig batchConfig =
+ new BatchConfig(tableConfig.getTableName(),
batchIngestionConfig.getBatchConfigMaps().get(0));
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
+
+ // Input/output configs
+ segmentGeneratorConfig.setInputFilePath(batchConfig.getInputDirURI());
+ segmentGeneratorConfig.setOutDir(batchConfig.getOutputDirURI());
+
+ // Reader configs
+ segmentGeneratorConfig
+
.setRecordReaderPath(RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString()));
+ Map<String, String> recordReaderProps = batchConfig.getRecordReaderProps();
+
segmentGeneratorConfig.setReaderConfig(RecordReaderFactory.getRecordReaderConfig(batchConfig.getInputFormat(),
+ IngestionConfigUtils.getRecordReaderProps(recordReaderProps)));
+
+ // Segment name generator configs
+ SegmentNameGenerator segmentNameGenerator =
+ getSegmentNameGenerator(batchConfig,
batchIngestionConfig.getSegmentIngestionType(),
+ batchIngestionConfig.getSegmentIngestionFrequency(), tableConfig,
schema);
+ segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+ String sequenceId = batchConfig.getSequenceId();
+ if (StringUtils.isNumeric(sequenceId)) {
+ segmentGeneratorConfig.setSequenceId(Integer.parseInt(sequenceId));
+ }
+
+ return segmentGeneratorConfig;
+ }
+
+ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig
batchConfig, String pushType,
+ String pushFrequency, TableConfig tableConfig, Schema schema) {
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(batchConfig.getTableNameWithType());
+ String segmentNameGeneratorType =
batchConfig.getSegmentNameGeneratorType();
+ if (segmentNameGeneratorType == null) {
+ segmentNameGeneratorType = DEFAULT_SEGMENT_NAME_GENERATOR_TYPE;
+ }
+ switch (segmentNameGeneratorType) {
+ case BatchConfigProperties.SegmentNameGeneratorType.FIXED:
+ return new FixedSegmentNameGenerator(batchConfig.getSegmentName());
+
+ case BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE:
+ DateTimeFormatSpec dateTimeFormatSpec = null;
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumnName != null) {
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ if (dateTimeFieldSpec != null) {
+ dateTimeFormatSpec = new
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+ }
+ }
+ return new NormalizedDateSegmentNameGenerator(rawTableName,
batchConfig.getSegmentNamePrefix(),
+ batchConfig.isExcludeSequenceId(), pushType, pushFrequency,
dateTimeFormatSpec);
+
+ case BatchConfigProperties.SegmentNameGeneratorType.SIMPLE:
+ return new SimpleSegmentNameGenerator(rawTableName,
batchConfig.getSegmentNamePostfix());
+
+ default:
+ throw new IllegalStateException(String
+ .format("Unsupported segmentNameGeneratorType: %s for table: %s",
segmentNameGeneratorType,
+ tableConfig.getTableName()));
+ }
+ }
+
+ /**
+ * Builds a segment using given {@link SegmentGeneratorConfig}
+ * @return segment name
+ */
+ public static String buildSegment(SegmentGeneratorConfig
segmentGeneratorConfig)
+ throws Exception {
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ return driver.getSegmentName();
+ }
+
+ /**
+ * Uploads the segment tar files to the provided controller
+ */
+ public static void uploadSegment(String tableNameWithType, List<File>
tarFiles, URI controllerUri, String authToken)
+ throws RetriableOperationException, AttemptsExceededException {
+ for (File tarFile : tarFiles) {
+ String fileName = tarFile.getName();
+ Preconditions
+
.checkArgument(fileName.endsWith(org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT));
+ String segmentName = fileName.substring(0,
+ fileName.length() -
org.apache.pinot.spi.ingestion.batch.spec.Constants.TAR_GZ_FILE_EXT.length());
+
+ RetryPolicies.exponentialBackoffRetryPolicy(DEFAULT_ATTEMPTS,
DEFAULT_RETRY_WAIT_MS, 5).attempt(() -> {
+ try (InputStream inputStream = new FileInputStream(tarFile)) {
+ SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
+
.uploadSegment(FileUploadDownloadClient.getUploadSegmentURI(controllerUri),
segmentName, inputStream,
+ FileUploadDownloadClient.makeAuthHeader(authToken),
+ FileUploadDownloadClient.makeTableParam(tableNameWithType),
+ FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS);
+ LOGGER.info("Response for pushing table {} segment {} - {}: {}",
tableNameWithType, segmentName,
+ response.getStatusCode(), response.getResponse());
+ return true;
+ } catch (HttpErrorStatusException e) {
+ int statusCode = e.getStatusCode();
+ if (statusCode >= 500) {
+ LOGGER.warn("Caught temporary exception while pushing table: {}
segment: {}, will retry", tableNameWithType,
+ segmentName, e);
+ return false;
+ } else {
+ LOGGER
Review comment:
log and throw anti-pattern ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]