This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f49328b Add segment merge-rollup task executor (#5587)
f49328b is described below
commit f49328bf1d6fcc04eb73ed8a6c4555c231853fe9
Author: Seunghyun Lee <[email protected]>
AuthorDate: Tue Jun 30 00:08:44 2020 -0700
Add segment merge-rollup task executor (#5587)
* Add segment merge-rollup task executor
1. add segment merge-rollup task executor
2. add the unit test for task executor
* Updated the code format based on Pinot style
---
.../apache/pinot/core/common/MinionConstants.java | 6 +
.../apache/pinot/core/minion/SegmentConverter.java | 27 ++---
.../minion/rollup/MergeRollupSegmentConverter.java | 22 ++--
.../impl/SegmentIndexCreationDriverImpl.java | 4 +-
.../minion/MergeRollupSegmentConverterTest.java | 18 ++-
.../minion/executor/MergeRollupTaskExecutor.java | 80 ++++++++++++
.../executor/MergeRollupTaskExecutorFactory.java | 26 ++++
.../executor/TaskExecutorFactoryRegistry.java | 1 +
.../executor/MergeRollupTaskExecutorTest.java | 134 +++++++++++++++++++++
.../segment/converter/SegmentMergeCommand.java | 6 +-
10 files changed, 281 insertions(+), 43 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 4b99b56..b8ff4df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -55,4 +55,10 @@ public class MinionConstants {
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
}
+
+ public static class MergeRollupTask {
+ public static final String TASK_TYPE = "mergeRollupTask";
+ public static final String MERGE_TYPE_KEY = "mergeTypeKey";
+ public static final String MERGED_SEGMENT_NAME_KEY =
"mergedSegmentNameKey";
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java
index c399505..77b5b49 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java
@@ -71,7 +71,6 @@ public class SegmentConverter {
private RecordAggregator _recordAggregator;
private List<String> _groupByColumns;
private boolean _skipTimeValueCheck;
- private IndexingConfig _indexingConfig;
public SegmentConverter(List<File> inputIndexDirs, File workingDir, String
tableName, String segmentName,
int totalNumPartition, RecordTransformer recordTransformer, @Nullable
RecordPartitioner recordPartitioner,
@@ -90,7 +89,6 @@ public class SegmentConverter {
_recordAggregator = recordAggregator;
_groupByColumns = groupByColumns;
_skipTimeValueCheck = skipTimeValueCheck;
- _indexingConfig = tableConfig.getIndexingConfig();
}
public List<File> convertSegment()
@@ -121,20 +119,19 @@ public class SegmentConverter {
}
// Sorting on sorted column and creating indices
- if (_indexingConfig != null) {
- List<String> sortedColumn = _indexingConfig.getSortedColumn();
- List<String> invertedIndexColumns =
_indexingConfig.getInvertedIndexColumns();
-
- // Check if the table config has any index configured
- if (CollectionUtils.isNotEmpty(sortedColumn) ||
CollectionUtils.isNotEmpty(invertedIndexColumns)) {
- String indexGenerationOutputPath = _workingDir.getPath() +
File.separator + INDEX_PREFIX + currentPartition;
- try (PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader(outputSegment, null,
- sortedColumn)) {
- buildSegment(indexGenerationOutputPath, outputSegmentName,
pinotSegmentRecordReader,
- pinotSegmentRecordReader.getSchema(), _tableConfig);
- }
- outputSegment = new File(indexGenerationOutputPath + File.separator
+ outputSegmentName);
+ IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+ List<String> sortedColumn = indexingConfig.getSortedColumn();
+ List<String> invertedIndexColumns =
indexingConfig.getInvertedIndexColumns();
+
+ // Check if the table config has any index configured
+ if (CollectionUtils.isNotEmpty(sortedColumn) ||
CollectionUtils.isNotEmpty(invertedIndexColumns)) {
+ String indexGenerationOutputPath = _workingDir.getPath() +
File.separator + INDEX_PREFIX + currentPartition;
+ try (PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader(outputSegment, null,
+ sortedColumn)) {
+ buildSegment(indexGenerationOutputPath, outputSegmentName,
pinotSegmentRecordReader,
+ pinotSegmentRecordReader.getSchema(), _tableConfig);
}
+ outputSegment = new File(indexGenerationOutputPath + File.separator +
outputSegmentName);
}
resultFiles.add(outputSegment);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
index 9e79070..ac3230e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
@@ -22,18 +22,15 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.core.minion.SegmentConverter;
import org.apache.pinot.core.minion.segment.RecordAggregator;
import org.apache.pinot.core.minion.segment.RecordTransformer;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -49,18 +46,17 @@ public class MergeRollupSegmentConverter {
private String _tableName;
private String _segmentName;
private MergeType _mergeType;
- private Map<String, String> _rolllupPreAggregateType;
+ private Map<String, String> _rollupPreAggregateType;
- private MergeRollupSegmentConverter(@Nonnull List<File> inputIndexDirs,
@Nonnull File workingDir,
- @Nonnull String tableName, @Nonnull String segmentName, @Nonnull String
mergeType,
- @Nullable Map<String, String> rollupPreAggregateType, TableConfig
tableConfig) {
+ private MergeRollupSegmentConverter(List<File> inputIndexDirs, File
workingDir, String tableName, String segmentName,
+ MergeType mergeType, TableConfig tableConfig, @Nullable Map<String,
String> rollupPreAggregateType) {
_inputIndexDirs = inputIndexDirs;
_workingDir = workingDir;
_tableName = tableName;
_segmentName = segmentName;
- _mergeType = MergeType.fromString(mergeType);
- _rolllupPreAggregateType = rollupPreAggregateType;
+ _mergeType = mergeType;
_tableConfig = tableConfig;
+ _rollupPreAggregateType = rollupPreAggregateType;
}
public List<File> convert()
@@ -122,7 +118,7 @@ public class MergeRollupSegmentConverter {
RecordTransformer rollupRecordTransformer = (row) -> row;
// Initialize roll-up record aggregator
- RecordAggregator rollupRecordAggregator = new
RollupRecordAggregator(schema, _rolllupPreAggregateType);
+ RecordAggregator rollupRecordAggregator = new
RollupRecordAggregator(schema, _rollupPreAggregateType);
SegmentConverter rollupSegmentConverter =
new
SegmentConverter.Builder().setTableName(_tableName).setSegmentName(_segmentName)
@@ -137,7 +133,7 @@ public class MergeRollupSegmentConverter {
// Required
private List<File> _inputIndexDirs;
private File _workingDir;
- private String _mergeType;
+ private MergeType _mergeType;
private String _tableName;
private String _segmentName;
private TableConfig _tableConfig;
@@ -155,7 +151,7 @@ public class MergeRollupSegmentConverter {
return this;
}
- public Builder setMergeType(String mergeType) {
+ public Builder setMergeType(MergeType mergeType) {
_mergeType = mergeType;
return this;
}
@@ -182,7 +178,7 @@ public class MergeRollupSegmentConverter {
public MergeRollupSegmentConverter build() {
return new MergeRollupSegmentConverter(_inputIndexDirs, _workingDir,
_tableName, _segmentName, _mergeType,
- _rollupPreAggregateType, _tableConfig);
+ _tableConfig, _rollupPreAggregateType);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index ebb8c72..10fa29a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -257,7 +257,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
FileUtils.deleteQuietly(tempIndexDir);
// Convert segment format if necessary
- convertFormatIfNeeded(segmentOutputDir);
+ convertFormatIfNecessary(segmentOutputDir);
// Build star-tree V2 if necessary
buildStarTreeV2IfNecessary(segmentOutputDir);
@@ -316,7 +316,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
// Using converter is similar to option (2), plus it's battle-tested code.
We will roll out with
// this change to keep changes limited. Once we've migrated we can implement
approach (1) with option to
// copy for indexes for which we don't know sizes upfront.
- private void convertFormatIfNeeded(File segmentDirectory)
+ private void convertFormatIfNecessary(File segmentDirectory)
throws Exception {
SegmentVersion versionToGenerate = config.getSegmentVersion();
if (versionToGenerate.equals(SegmentVersion.v1)) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
index 412f1c1..81430d1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
@@ -25,13 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.minion.rollup.MergeType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
@@ -85,11 +83,11 @@ public class MergeRollupSegmentConverterTest {
for (int i = 0; i < NUM_ROWS; i++) {
int dimensionValue = i % (NUM_ROWS / REPEAT_ROWS);
GenericRow row = new GenericRow();
- row.putField(D1, dimensionValue);
- row.putField(D2, Integer.toString(dimensionValue));
- row.putField(M1, (long) dimensionValue);
- row.putField(M2, (double) dimensionValue);
- row.putField(T, timestamp++);
+ row.putValue(D1, dimensionValue);
+ row.putValue(D2, Integer.toString(dimensionValue));
+ row.putValue(M1, (long) dimensionValue);
+ row.putValue(M2, (double) dimensionValue);
+ row.putValue(T, timestamp++);
rows.add(row);
}
@@ -113,7 +111,7 @@ public class MergeRollupSegmentConverterTest {
// Run roll-up segment converter with "CONCATENATE" merge type
MergeRollupSegmentConverter rollupSegmentConverter =
new
MergeRollupSegmentConverter.Builder().setInputIndexDirs(_segmentIndexDirList).setWorkingDir(WORKING_DIR)
-
.setTableName(TABLE_NAME).setSegmentName("TestConcatenate").setMergeType("CONCATENATE")
+
.setTableName(TABLE_NAME).setSegmentName("TestConcatenate").setMergeType(MergeType.CONCATENATE)
.setTableConfig(_tableConfig).build();
List<File> result = rollupSegmentConverter.convert();
Assert.assertEquals(result.size(), 1);
@@ -153,7 +151,7 @@ public class MergeRollupSegmentConverterTest {
// Run roll-up segment converter with "ROLLUP" merge type
MergeRollupSegmentConverter rollupSegmentConverter =
new
MergeRollupSegmentConverter.Builder().setInputIndexDirs(_segmentIndexDirList).setWorkingDir(WORKING_DIR)
-
.setTableName(TABLE_NAME).setSegmentName("TestSimpleRollup").setMergeType("ROLLUP")
+
.setTableName(TABLE_NAME).setSegmentName("TestSimpleRollup").setMergeType(MergeType.ROLLUP)
.setRollupPreAggregateType(preAggregateType).setTableConfig(_tableConfig).build();
List<File> result = rollupSegmentConverter.convert();
Assert.assertEquals(result.size(), 1);
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java
new file mode 100644
index 0000000..b94cf6a
--- /dev/null
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.minion.rollup.MergeRollupSegmentConverter;
+import org.apache.pinot.core.minion.rollup.MergeType;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Task executor that provides merge and rollup service
+ *
+ * TODO:
+ * 1. Add the support for roll-up
+ * 2. Add the support for time split to provide backfill support for merged
segments
+ * 3. Change the way to decide the number of output segments (explicit
numPartition config -> maxNumRowsPerSegment)
+ */
+public class MergeRollupTaskExecutor extends
BaseMultipleSegmentsConversionExecutor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+
+ @Override
+ protected List<SegmentConversionResult> convert(PinotTaskConfig
pinotTaskConfig, List<File> originalIndexDirs,
+ File workingDir)
+ throws Exception {
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ String mergeTypeString =
configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
+ // TODO: add the support for rollup
+ Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
+
+ MergeType mergeType = MergeType.fromString(mergeTypeString);
+ Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only
'CONCATENATE' mode is currently supported.");
+
+ String mergedSegmentName =
configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
+ String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+ TableConfig tableConfig =
+
ZKMetadataProvider.getTableConfig(MINION_CONTEXT.getHelixPropertyStore(),
tableNameWithType);
+
+ MergeRollupSegmentConverter rollupSegmentConverter =
+ new
MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
+
.setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
+ .setTableConfig(tableConfig).build();
+
+ List<File> resultFiles = rollupSegmentConverter.convert();
+ List<SegmentConversionResult> results = new ArrayList<>();
+ for (File file : resultFiles) {
+ String outputSegmentName = file.getName();
+ results.add(new
SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
+ .setTableNameWithType(tableNameWithType).build());
+ }
+ return results;
+ }
+}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
new file mode 100644
index 0000000..66d2c86
--- /dev/null
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+public class MergeRollupTaskExecutorFactory implements
PinotTaskExecutorFactory {
+ @Override
+ public PinotTaskExecutor create() {
+ return new MergeRollupTaskExecutor();
+ }
+}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 71a35d8..ba3646c 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -35,6 +35,7 @@ public class TaskExecutorFactoryRegistry {
registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
new ConvertToRawIndexTaskExecutorFactory());
registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new
PurgeTaskExecutorFactory());
+ registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new
MergeRollupTaskExecutorFactory());
}
/**
diff --git
a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java
new file mode 100644
index 0000000..245a085
--- /dev/null
+++
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.minion.rollup.MergeType;
+import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class MergeRollupTaskExecutorTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
"MergeRollupTaskExecutorTest");
+ private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR,
"originalSegment");
+ private static final File WORKING_DIR = new File(TEMP_DIR, "workingDir");
+ private static final int NUM_SEGMENTS = 10;
+ private static final int NUM_ROWS = 5;
+ private static final String MERGED_SEGMENT_NAME = "testMergedSegment";
+ private static final String TABLE_NAME = "testTable";
+ private static final String D1 = "d1";
+
+ private List<File> _segmentIndexDirList;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1,
FieldSpec.DataType.INT).build();
+
+ List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue(D1, i);
+ rows.add(row);
+ }
+
+ _segmentIndexDirList = new ArrayList<>();
+ for (int i = 0; i < NUM_SEGMENTS; i++) {
+ String segmentName = MERGED_SEGMENT_NAME + i;
+ RecordReader recordReader = new GenericRowRecordReader(rows);
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(segmentName);
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(config, recordReader);
+ driver.build();
+ _segmentIndexDirList.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
+ }
+
+ MinionContext minionContext = MinionContext.getInstance();
+ @SuppressWarnings("unchecked")
+ ZkHelixPropertyStore<ZNRecord> helixPropertyStore =
mock(ZkHelixPropertyStore.class);
+ when(helixPropertyStore.get("/CONFIGS/TABLE/testTable_OFFLINE", null,
AccessOption.PERSISTENT))
+ .thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+ minionContext.setHelixPropertyStore(helixPropertyStore);
+ }
+
+ @Test
+ public void testConvert()
+ throws Exception {
+ MergeRollupTaskExecutor mergeRollupTaskExecutor = new
MergeRollupTaskExecutor();
+ Map<String, String> configs = new HashMap<>();
+ configs.put(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY,
MergeType.CONCATENATE.toString());
+ configs.put(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY,
MERGED_SEGMENT_NAME);
+ configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
+
+ PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
+ List<SegmentConversionResult> conversionResults =
+ mergeRollupTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList,
WORKING_DIR);
+
+ Assert.assertEquals(conversionResults.size(), 1);
+ Assert.assertEquals(conversionResults.get(0).getSegmentName(),
MERGED_SEGMENT_NAME);
+ File mergedSegment = conversionResults.get(0).getFile();
+ try (PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader(mergedSegment)) {
+ int numRecords = 0;
+ GenericRow row = new GenericRow();
+ while (pinotSegmentRecordReader.hasNext()) {
+ row = pinotSegmentRecordReader.next(row);
+ numRecords++;
+ }
+ Assert.assertEquals(numRecords, NUM_SEGMENTS * NUM_ROWS);
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
index 76c691d..ff37a4f 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -194,9 +194,9 @@ public class SegmentMergeCommand extends
AbstractBaseAdminCommand implements Com
// TODO: add support for rollup
String tableName =
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
MergeRollupSegmentConverter mergeRollupSegmentConverter =
- new
MergeRollupSegmentConverter.Builder().setMergeType(_mergeType).setSegmentName(_outputSegmentName)
-
.setInputIndexDirs(inputIndexDirs).setWorkingDir(workingDir).setTableName(tableName)
- .setTableConfig(tableConfig).build();
+ new
MergeRollupSegmentConverter.Builder().setMergeType(MergeType.fromString(_mergeType))
+
.setSegmentName(_outputSegmentName).setInputIndexDirs(inputIndexDirs).setWorkingDir(workingDir)
+ .setTableName(tableName).setTableConfig(tableConfig).build();
List<File> outputSegments = mergeRollupSegmentConverter.convert();
Preconditions.checkState(outputSegments.size() == 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]