This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e8db382d51 [Backfill] allow externally partitioned segment uploads for
upsert tables (#13107)
e8db382d51 is described below
commit e8db382d519aa59310f5788b75f5b1cc13143f55
Author: rohit <[email protected]>
AuthorDate: Mon Jun 10 21:48:44 2024 +0530
[Backfill] allow externally partitioned segment uploads for upsert tables
(#13107)
* [Backfill] allow externally partitioned segment uploads for upsert tables
* upload segment with partitionId
* revise uploaded realtime segment name convention
---
.../apache/pinot/common/utils/SegmentUtils.java | 6 +
.../common/utils/UploadedRealtimeSegmentName.java | 181 +++++++++++++++++++++
.../pinot/common/utils/SegmentUtilsTest.java | 60 +++++++
.../utils/UploadedRealtimeSegmentNameTest.java | 58 +++++++
.../batch/common/SegmentGenerationTaskRunner.java | 23 ++-
...oncurrentMapPartitionUpsertMetadataManager.java | 49 +++++-
...rrentMapPartitionUpsertMetadataManagerTest.java | 169 +++++++++++++++++++
.../spi/creator/SegmentGeneratorConfig.java | 21 +++
.../creator/name/SegmentNameGeneratorFactory.java | 1 +
.../name/UploadedRealtimeSegmentNameGenerator.java | 91 +++++++++++
.../spi/creator/SegmentGeneratorConfigTest.java | 15 ++
.../UploadedRealtimeSegmentNameGeneratorTest.java | 27 +--
.../spi/ingestion/batch/BatchConfigProperties.java | 1 +
13 files changed, 682 insertions(+), 20 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index aaf44dc441..8b89a2b1a5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -44,6 +44,12 @@ public class SegmentUtils {
if (llcSegmentName != null) {
return llcSegmentName.getPartitionGroupId();
}
+
+ UploadedRealtimeSegmentName uploadedRealtimeSegmentName =
UploadedRealtimeSegmentName.of(segmentName);
+ if (uploadedRealtimeSegmentName != null) {
+ return uploadedRealtimeSegmentName.getPartitionId();
+ }
+
// Otherwise, retrieve the partition id from the segment zk metadata.
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(),
realtimeTableName, segmentName);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java
new file mode 100644
index 0000000000..ec2b257b12
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like:
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * <p>This naming convention is adopted to represent a segment uploaded to a
realtime table. The naming
+ * convention has been kept semantically similar to {@link LLCSegmentName} but
differs in following ways:
+ *
+ * <li> prefix to quickly identify the type/source of segment e.g.
"uploaded"/"minion"
+ * <li> name of the table to which the segment belongs
+ * <li> partitionId which should be consistent as the stream partitioning in
case of upsert realtime tables.
+ * <li> creationTime creation time of segment of the format yyyyMMdd'T'HHmm'Z'
+ * <li> suffix to uniquely identify segments created at the same time.
+ *
+ * Use {@link
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator}
to generate segment names.
+ */
+public class UploadedRealtimeSegmentName implements
Comparable<UploadedRealtimeSegmentName> {
+
+ private static final String SEPARATOR = "__";
+ private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
+ private static final DateTimeFormatter DATE_FORMATTER =
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+ private final String _prefix;
+ private final String _tableName;
+ private final int _partitionId;
+ private final String _creationTime;
+ private final String _segmentName;
+ private final String _suffix;
+
+ public UploadedRealtimeSegmentName(String segmentName) {
+ try {
+ String[] parts = StringUtils.splitByWholeSeparator(segmentName,
SEPARATOR);
+ Preconditions.checkState(parts.length == 5,
+ "Uploaded segment name must be of the format
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}");
+ _prefix = parts[0];
+ _tableName = parts[1];
+ _partitionId = Integer.parseInt(parts[2]);
+ _creationTime = parts[3];
+ _suffix = parts[4];
+ _segmentName = segmentName;
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid segment name: " +
segmentName, e);
+ }
+ }
+
+ /**
+ * Constructor for UploadedRealtimeSegmentName.
+ * @param tableName
+ * @param partitionId
+ * @param msSinceEpoch
+ * @param prefix
+ * @param suffix
+ */
+ public UploadedRealtimeSegmentName(String tableName, int partitionId, long
msSinceEpoch, String prefix,
+ String suffix) {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(tableName) && !tableName.contains(SEPARATOR) &&
StringUtils.isNotBlank(prefix)
+ && !prefix.contains(SEPARATOR) && StringUtils.isNotBlank(suffix)
&& !suffix.contains(SEPARATOR),
+ "tableName, prefix and suffix must be non-null, non-empty and not
contain '__'");
+ _tableName = tableName;
+ _partitionId = partitionId;
+ _creationTime = DATE_FORMATTER.print(msSinceEpoch);
+ _prefix = prefix;
+ _suffix = suffix;
+ _segmentName = Joiner.on(SEPARATOR).join(prefix, tableName, partitionId,
_creationTime, suffix);
+ }
+
+ /**
+ * Checks if the segment name is of the format:
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ * @param segmentName
+ * @return boolean true if the segment name is of the format:
{prefix}__{tableName}__{partitionId}__{creationTime}
+ * __{suffix}
+ */
+ public static boolean isUploadedRealtimeSegmentName(String segmentName) {
+ int numSeparators = 0;
+ int index = 0;
+ while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
+ numSeparators++;
+ index += 2; // SEPARATOR.length()
+ }
+ return numSeparators == 4;
+ }
+
+ @Nullable
+ public static UploadedRealtimeSegmentName of(String segmentName) {
+ try {
+ return new UploadedRealtimeSegmentName(segmentName);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public int getPartitionId() {
+ return _partitionId;
+ }
+
+ /**
+ * Returns the creation time in the format yyyyMMdd'T'HHmm'Z'
+ * To be used for only human readability and not for any computation
+ * @return
+ */
+ public String getCreationTime() {
+ return _creationTime;
+ }
+
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ public String getPrefix() {
+ return _prefix;
+ }
+
+ public String getSuffix() {
+ return _suffix;
+ }
+
+ /**
+ * Compares the string representation of the segment name.
+ * @param other the object to be compared.
+ * @return
+ */
+ @Override
+ public int compareTo(UploadedRealtimeSegmentName other) {
+ Preconditions.checkState(_tableName.equals(other._tableName),
+ "Cannot compare segment names from different table: %s, %s",
_segmentName, other.getSegmentName());
+ return _segmentName.compareTo(other._segmentName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UploadedRealtimeSegmentName)) {
+ return false;
+ }
+ UploadedRealtimeSegmentName that = (UploadedRealtimeSegmentName) o;
+ return _segmentName.equals(that._segmentName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_segmentName);
+ }
+
+ @Override
+ public String toString() {
+ return _segmentName;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
index 203cc249d7..1257bc0498 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
@@ -18,14 +18,29 @@
*/
package org.apache.pinot.common.utils;
+import java.util.HashMap;
+import java.util.HashSet;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.fail;
public class SegmentUtilsTest {
+ private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
private static final String SEGMENT = "testSegment";
+ private static final String PARTITION_COLUMN = "partitionColumn";
@Test
public void testGetSegmentCreationTimeMs() {
@@ -35,4 +50,49 @@ public class SegmentUtilsTest {
segmentZKMetadata.setPushTime(2000L);
assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata),
2000L);
}
+
+ @Test
+ public void testGetRealtimeSegmentPartitionIdFromZkMetadata() {
+
+ // mocks
+ SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class);
+ SegmentPartitionMetadata segmentPartitionMetadata =
mock(SegmentPartitionMetadata.class);
+ HashMap<String, ColumnPartitionMetadata> columnPartitionMetadataMap = new
HashMap<>();
+ HashSet<Integer> partitions = new HashSet<>();
+ partitions.add(3);
+ columnPartitionMetadataMap.put(PARTITION_COLUMN,
+ new ColumnPartitionMetadata("modulo", 8, partitions, new HashMap<>()));
+
+
when(segmentPartitionMetadata.getColumnPartitionMap()).thenReturn(columnPartitionMetadataMap);
+
when(segmentZKMetadata.getPartitionMetadata()).thenReturn(segmentPartitionMetadata);
+
+ HelixManager helixManager = mock(HelixManager.class);
+ ZkHelixPropertyStore zkHelixPropertyStore =
mock(ZkHelixPropertyStore.class);
+
when(helixManager.getHelixPropertyStore()).thenReturn(zkHelixPropertyStore);
+
+ // mock static ZKMetadataProvider.getSegmentZKMetadata
+ try (MockedStatic<ZKMetadataProvider> zkMetadataProviderMockedStatic =
Mockito.mockStatic(
+ ZKMetadataProvider.class)) {
+
when(ZKMetadataProvider.getSegmentZKMetadata(Mockito.any(ZkHelixPropertyStore.class),
eq(TABLE_NAME_WITH_TYPE),
+ eq(SEGMENT))).thenReturn(segmentZKMetadata);
+
+ Integer partitionId =
+ SegmentUtils.getRealtimeSegmentPartitionId(SEGMENT,
TABLE_NAME_WITH_TYPE, helixManager, PARTITION_COLUMN);
+
+ assertEquals(partitionId, 3);
+ }
+ }
+
+ @Test
+ void testGetRealtimeSegmentPartitionIdForUploadedRealtimeSegment() {
+ String segmentName = "uploaded__table_name__3__100__1716185755000";
+
+ try {
+ Integer partitionId =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
"realtimeTableName", null, "partitionColumn");
+ assertEquals(partitionId, 3);
+ } catch (Exception e) {
+ fail("Exception should not be thrown");
+ }
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java
new file mode 100644
index 0000000000..0cfb8a0196
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentNameTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class UploadedRealtimeSegmentNameTest {
+
+ @Test
+ public void testSegmentNameParsing() {
+ String segmentName = "uploaded__table_name__1__20240530T0000Z__suffix";
+ UploadedRealtimeSegmentName uploadedRealtimeSegmentName = new
UploadedRealtimeSegmentName(segmentName);
+
+ Assert.assertEquals(uploadedRealtimeSegmentName.getTableName(),
"table_name");
+ Assert.assertEquals(uploadedRealtimeSegmentName.getPartitionId(), 1);
+ Assert.assertEquals(uploadedRealtimeSegmentName.getPrefix(), "uploaded");
+ Assert.assertEquals(uploadedRealtimeSegmentName.getSuffix(), "suffix");
+ Assert.assertEquals(uploadedRealtimeSegmentName.getCreationTime(),
"20240530T0000Z");
+ }
+
+ @Test
+ public void testSegmentNameGeneration() {
+ UploadedRealtimeSegmentName uploadedRealtimeSegmentName =
+ new UploadedRealtimeSegmentName("tableName", 1, 1717027200000L,
"uploaded", "2");
+ String expectedSegmentName = "uploaded__tableName__1__20240530T0000Z__2";
+
+ Assert.assertEquals(uploadedRealtimeSegmentName.getSegmentName(),
expectedSegmentName);
+ }
+
+ @Test
+ public void testIsUploadedRealtimeSegmentName() {
+ String validSegmentName = "uploaded__table__0__20220101T0000Z__suffix";
+
Assert.assertTrue(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(validSegmentName));
+
+ String invalidSegmentName = "uploaded__table__0__20220101T0000Z";
+
Assert.assertFalse(UploadedRealtimeSegmentName.isUploadedRealtimeSegmentName(invalidSegmentName));
+ }
+}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 8fc421f79e..37ff9bc47c 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.ingestion.batch.common;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +30,7 @@ import
org.apache.pinot.segment.spi.creator.name.InputFileSegmentNameGenerator;
import
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -159,14 +161,25 @@ public class SegmentGenerationTaskRunner implements
Serializable {
Boolean.parseBoolean(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)),
IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig),
dateTimeFormatSpec,
- segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX),
- appendUUIDToSegmentName);
+ segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX),
appendUUIDToSegmentName);
case BatchConfigProperties.SegmentNameGeneratorType.INPUT_FILE:
String inputFileUri =
_taskSpec.getCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY);
return new
InputFileSegmentNameGenerator(segmentNameGeneratorConfigs.get(FILE_PATH_PATTERN),
- segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE),
- inputFileUri,
- appendUUIDToSegmentName);
+ segmentNameGeneratorConfigs.get(SEGMENT_NAME_TEMPLATE),
inputFileUri, appendUUIDToSegmentName);
+ case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
+ Preconditions.checkState(segmentGeneratorConfig.getCreationTime() !=
null,
+ "Creation time must be set for uploaded realtime segment name
generator");
+
Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId()
!= -1,
+ "Valid partition id must be set for uploaded realtime segment name
generator");
+ long creationTime;
+ try {
+ creationTime =
Long.parseLong(segmentGeneratorConfig.getCreationTime());
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Creation time must be a valid
long value in segmentGeneratorConfig");
+ }
+ return new UploadedRealtimeSegmentNameGenerator(tableName,
+ segmentGeneratorConfig.getUploadedSegmentPartitionId(),
creationTime,
+ segmentGeneratorConfig.getSegmentNamePrefix(),
segmentGeneratorConfig.getSegmentNamePostfix());
default:
throw new UnsupportedOperationException("Unsupported segment name
generator type: " + segmentNameGeneratorType);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 5980cbbace..86146b8802 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.LazyRow;
import org.apache.pinot.segment.local.utils.HashUtils;
@@ -135,10 +136,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
// Update the record location when getting a newer comparison
value, or the value is the same as the
// current value, but the segment has a larger sequence number
(the segment is newer than the current
// segment).
- if (comparisonResult > 0 || (comparisonResult == 0 &&
LLCSegmentName.isLLCSegment(segmentName)
- && LLCSegmentName.isLLCSegment(currentSegmentName)
- && LLCSegmentName.getSequenceNumber(segmentName) >
LLCSegmentName.getSequenceNumber(
- currentSegmentName))) {
+ if (comparisonResult > 0 || (comparisonResult == 0 &&
shouldReplaceOnComparisonTie(segmentName,
+ currentSegmentName,
segment.getSegmentMetadata().getIndexCreationTime(),
+
currentSegment.getSegmentMetadata().getIndexCreationTime()))) {
replaceDocId(segment, validDocIds, queryableDocIds,
currentSegment, currentDocId, newDocId, recordInfo);
return new RecordLocation(segment, newDocId,
newComparisonValue);
} else {
@@ -158,6 +158,47 @@ public class ConcurrentMapPartitionUpsertMetadataManager
extends BasePartitionUp
}
}
+ /**
+ * <li> When the replacing segment and current segment are of {@link
LLCSegmentName} then the PK should resolve to
+ * row in segment with higher sequence id.
+ * <li> If either or both are not LLC segment, then resolve based on
creation time of segment. If creation time is
+ * same then prefer uploaded segment if other is LLCSegmentName
+ * <li> If both are uploaded segment, prefer standard
UploadedRealtimeSegmentName, if still a tie, then resolve to
+ * current segment.
+ *
+ * @param segmentName replacing segment name
+ * @param currentSegmentName current segment name having the record for the
given primary key
+ * @param segmentCreationTimeMs replacing segment creation time
+ * @param currentSegmentCreationTimeMs current segment creation time
+ * @return true if the record in replacing segment should replace the record
in current segment
+ */
+ protected boolean shouldReplaceOnComparisonTie(String segmentName, String
currentSegmentName,
+ long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+ // resolve using sequence id if both are LLCSegmentName
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ LLCSegmentName currentLLCSegmentName =
LLCSegmentName.of(currentSegmentName);
+ if (llcSegmentName != null && currentLLCSegmentName != null) {
+ return llcSegmentName.getSequenceNumber() >
currentLLCSegmentName.getSequenceNumber();
+ }
+
+ // either or both are uploaded segments, prefer the latest segment
+ int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs,
currentSegmentCreationTimeMs);
+ if (creationTimeComparisonRes != 0) {
+ return creationTimeComparisonRes > 0;
+ }
+
+ // if both are uploaded segment, prefer standard
UploadedRealtimeSegmentName, if still a tie, then resolve to
+ // current segment
+ if (UploadedRealtimeSegmentName.of(currentSegmentName) != null) {
+ return false;
+ }
+ if (UploadedRealtimeSegmentName.of(segmentName) != null) {
+ return true;
+ }
+ return false;
+ }
+
@Override
protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment,
ThreadSafeMutableRoaringBitmap validDocIds,
@Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
Iterator<RecordInfo> recordInfoIterator) {
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index d6f3107b74..4270e9547d 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -378,6 +379,143 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
}
+ @Test
+ public void verifyAddReplaceUploadedSegment1()
+ throws IOException {
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+ _contextBuilder.setHashFunction(HashFunction.NONE).build());
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ // Add the first segment
+ int numRecords = 6;
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new
ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null,
primaryKeys1, segmentMetadata, null);
+ List<RecordInfo> recordInfoList1;
+ // get recordInfo by iterating all records.
+ recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
recordInfoList1.iterator());
+ trackedSegments.add(segment1);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
+
+ // Add the second segment of uploaded name format with same creation time
+ numRecords = 2;
+ primaryKeys = new int[]{0, 3};
+ timestamps = new int[]{100, 80};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl uploadedSegment2 =
+ mockUploadedImmutableSegment("2", validDocIds2, null,
getPrimaryKeyList(numRecords, primaryKeys), 1000L);
+ List<RecordInfo> recordInfoList2;
+ // get recordInfo by iterating all records.
+ recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null,
recordInfoList2.iterator());
+ trackedSegments.add(uploadedSegment2);
+
+ // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+ // uploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+
+ // replace uploadedSegment2
+ ThreadSafeMutableRoaringBitmap newValidDocIds2 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl newUploadedSegment2 =
+ mockUploadedImmutableSegment("2", newValidDocIds2, null,
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+ upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2,
null, recordInfoList2.iterator(),
+ uploadedSegment2);
+ trackedSegments.add(newUploadedSegment2);
+ trackedSegments.remove(uploadedSegment2);
+
+ // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+ // newUploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
+ assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
+
+ // add upploadedSegment3 with higher creation time than newUploadedSegment2
+ numRecords = 1;
+ primaryKeys = new int[]{0};
+ timestamps = new int[]{100};
+ ThreadSafeMutableRoaringBitmap validDocIds3 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl uploadedSegment3 =
+ mockUploadedImmutableSegment("3", validDocIds3, null,
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+ List<RecordInfo> recordInfoList3;
+ // get recordInfo by iterating all records.
+ recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null,
recordInfoList3.iterator());
+
+ // segment1: 1 -> {4, 120}, 2 -> {2, 100}
+ // newUploadedSegment2: 3 -> {1, 80}
+ // uploadedSegment3: 0 -> {0, 100}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4});
+ assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1});
+ assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new
int[]{0});
+
+ // add uploadedSegment4 with higher creation time than segment 1 and same
creation time as uploadedSegment3
+ numRecords = 2;
+ primaryKeys = new int[]{0, 1};
+ timestamps = new int[]{100, 120};
+ ThreadSafeMutableRoaringBitmap validDocIds4 = new
ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl uploadedSegment4 =
+ mockUploadedImmutableSegment("4", validDocIds4, null,
getPrimaryKeyList(numRecords, primaryKeys), 1040L);
+ List<RecordInfo> recordInfoList4;
+ // get recordInfo by iterating all records.
+ recordInfoList4 = getRecordInfoList(numRecords, primaryKeys, timestamps,
null);
+ upsertMetadataManager.addSegment(uploadedSegment4, validDocIds4, null,
recordInfoList4.iterator());
+
+ // segment1: 2 -> {2, 100}
+ // newUploadedSegment2: 3 -> {1, 80}
+ // uploadedSegment3: 0 -> {0, 100}
+ // uploadedSegment4: 1 -> {1, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100,
HashFunction.NONE);
+ checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80,
HashFunction.NONE);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2});
+ assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{1});
+ assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new
int[]{0});
+ assertEquals(validDocIds4.getMutableRoaringBitmap().toArray(), new
int[]{1});
+
+ // remove segments
+ upsertMetadataManager.removeSegment(segment1);
+ upsertMetadataManager.removeSegment(uploadedSegment2);
+ upsertMetadataManager.removeSegment(newUploadedSegment2);
+ upsertMetadataManager.removeSegment(uploadedSegment3);
+ upsertMetadataManager.removeSegment(uploadedSegment4);
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction
hashFunction, boolean enableSnapshot)
throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
@@ -606,6 +744,33 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+ return segment;
+ }
+
+ private static ImmutableSegmentImpl mockUploadedImmutableSegment(String
suffix,
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable
ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys, Long creationTimeMs) {
+ if (creationTimeMs == null) {
+ creationTimeMs = System.currentTimeMillis();
+ }
+ ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
+
when(segment.getSegmentName()).thenReturn(getUploadedRealtimeSegmentName(creationTimeMs,
suffix));
+ when(segment.getValidDocIds()).thenReturn(validDocIds);
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
+ DataSource dataSource = mock(DataSource.class);
+ when(segment.getDataSource(anyString())).thenReturn(dataSource);
+ ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
+ when(forwardIndex.isSingleValue()).thenReturn(true);
+ when(forwardIndex.getStoredType()).thenReturn(DataType.INT);
+ when(forwardIndex.getInt(anyInt(), any())).thenAnswer(
+ invocation ->
primaryKeys.get(invocation.getArgument(0)).getValues()[0]);
+ when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
+ SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+ when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+ when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
return segment;
}
@@ -656,6 +821,10 @@ public class
ConcurrentMapPartitionUpsertMetadataManagerTest {
return new LLCSegmentName(RAW_TABLE_NAME, 0, sequenceNumber,
System.currentTimeMillis()).toString();
}
+ private static String getUploadedRealtimeSegmentName(long creationTimeMs,
String suffix) {
+ return new UploadedRealtimeSegmentName(RAW_TABLE_NAME, 0, creationTimeMs,
"uploaded", suffix).toString();
+ }
+
private static PrimaryKey makePrimaryKey(int value) {
return new PrimaryKey(new Object[]{value});
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index c879c1be52..0742a814f0 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -40,6 +40,7 @@ import
org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
import
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
import org.apache.pinot.segment.spi.index.FieldIndexConfigsUtil;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -107,6 +108,8 @@ public class SegmentGeneratorConfig implements Serializable
{
private String _creatorVersion = null;
private SegmentNameGenerator _segmentNameGenerator = null;
private SegmentPartitionConfig _segmentPartitionConfig = null;
+
+ private int _uploadedSegmentPartitionId = -1;
private int _sequenceId = -1;
private TimeColumnType _timeColumnType = TimeColumnType.EPOCH;
private DateTimeFormatSpec _dateTimeFormatSpec = null;
@@ -463,6 +466,9 @@ public class SegmentGeneratorConfig implements Serializable
{
_segmentTimeColumnName = timeColumnName;
}
+ public int getUploadedSegmentPartitionId() {
+ return _uploadedSegmentPartitionId;
+ }
public int getSequenceId() {
return _sequenceId;
}
@@ -475,6 +481,13 @@ public class SegmentGeneratorConfig implements
Serializable {
return _fstTypeForFSTIndex;
}
+ /**
+ * Use this method to add partitionId if it is generated externally during
segment upload
+ */
+ public void setUploadedSegmentPartitionId(int partitionId) {
+ _uploadedSegmentPartitionId = partitionId;
+ }
+
/**
* This method should be used instead of setPostfix if you are adding a
sequence number.
*/
@@ -581,6 +594,9 @@ public class SegmentGeneratorConfig implements Serializable
{
IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig),
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig),
_dateTimeFormatSpec,
_segmentNamePostfix);
+ case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
+ return new UploadedRealtimeSegmentNameGenerator(_rawTableName,
_uploadedSegmentPartitionId,
+ Long.parseLong(_segmentCreationTime), _segmentNamePrefix,
_segmentNamePostfix);
default:
return new SimpleSegmentNameGenerator(_segmentNamePrefix != null ?
_segmentNamePrefix : _rawTableName,
_segmentNamePostfix);
@@ -600,6 +616,11 @@ public class SegmentGeneratorConfig implements
Serializable {
return BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE;
}
+ // if segment is externally partitioned
+ if (_uploadedSegmentPartitionId != -1) {
+ return BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME;
+ }
+
return BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
index 9fd6c97f35..498079da82 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameGeneratorFactory.java
@@ -32,6 +32,7 @@ public class SegmentNameGeneratorFactory {
public static final String FIXED_SEGMENT_NAME_GENERATOR = "fixed";
public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR =
"normalizeddate";
+ public static final String UPLOADED_REALTIME = "uploadedrealtime";
private SegmentNameGeneratorFactory() {
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java
new file mode 100644
index 0000000000..223649734d
--- /dev/null
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Implementation for generating segment names of the format
UploadedRealtimeSegmentName:
+ * {prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * <p> Naming convention to represent uploaded segments to a realtime table
see UploadedRealtimeSegmentName. The
+ * semantic is similar to LLCSegmentName. This naming convention should be
preferred when the data is partitioned in
+ * generated segments and should be assigned based on partitionId to ensure
consistency with stream partitioning for
+ * upsert tables.
+ */
+public class UploadedRealtimeSegmentNameGenerator implements
SegmentNameGenerator {
+
+ private static final String DELIMITER = "__";
+ private final String _tableName;
+ private final int _partitionId;
+ private final long _creationTimeMillis;
+ private final String _prefix;
+
+ // if suffix is not set then sequenceId is used as segment name suffix
+ @Nullable
+ private final String _suffix;
+
+ /**
+ * Creates a UploadedRealtimeSegmentNameGenerator
+ * @param tableName
+ * @param partitionId
+ * @param creationTimeMillis
+ * @param prefix
+ * @param suffix optional field for generator, if not specified then
sequenceId is used as suffix
+ */
+ public UploadedRealtimeSegmentNameGenerator(String tableName, int
partitionId, long creationTimeMillis, String prefix,
+ @Nullable String suffix) {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(tableName) && !tableName.contains(DELIMITER) &&
StringUtils.isNotBlank(prefix)
+ && !prefix.contains(DELIMITER), "Invalid tableName or prefix for
UploadedRealtimeSegmentNameGenerator");
+ Preconditions.checkArgument(creationTimeMillis > 0, "Creation time must be
greater than 0");
+ if (suffix != null) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(suffix) &&
!suffix.contains(DELIMITER),
+ "Invalid suffix for UploadedRealtimeSegmentNameGenerator");
+ }
+ _tableName = tableName;
+ _partitionId = partitionId;
+ _creationTimeMillis = creationTimeMillis;
+ _prefix = prefix;
+ _suffix = suffix;
+ }
+
+ @Override
+ public String generateSegmentName(int sequenceId, @Nullable Object
minTimeValue, @Nullable Object maxTimeValue) {
+ return Joiner.on(DELIMITER).join(_prefix, _tableName, _partitionId,
_creationTimeMillis,
+ StringUtils.isBlank(_suffix) ? sequenceId : _suffix);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder =
+ new StringBuilder("UploadedRealtimeSegmentNameGenerator:
tableName=").append(_tableName);
+ stringBuilder.append(", prefix=").append(_prefix);
+ stringBuilder.append(", partitionId=").append(_partitionId);
+ if (_suffix != null) {
+ stringBuilder.append(", suffix=").append(_suffix);
+ }
+ stringBuilder.append(", creationTimeMillis=").append(_creationTimeMillis);
+ return stringBuilder.toString();
+ }
+}
diff --git
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
index 36a7a0b0dd..39e4ce9c10 100644
---
a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfigTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pinot.segment.spi.creator.name.FixedSegmentNameGenerator;
import
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
+import
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -140,6 +141,20 @@ public class SegmentGeneratorConfigTest {
Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator()
instanceof SimpleSegmentNameGenerator);
Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator().toString().contains("tableName=testTable"));
+ // Table config is externally partitioned
+ tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("test").build();
+
+ segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+ segmentGeneratorConfig.setUploadedSegmentPartitionId(0);
+ segmentGeneratorConfig.setCreationTime("1234567890");
+ segmentGeneratorConfig.setSegmentNamePrefix("prefix");
+ segmentGeneratorConfig.setSegmentNamePostfix("5");
+
+ Assert.assertTrue(segmentGeneratorConfig.getSegmentNameGenerator()
instanceof UploadedRealtimeSegmentNameGenerator);
+ Assert.assertTrue(
+
segmentGeneratorConfig.getSegmentNameGenerator().toString().contains("tableName=test"));
+
// Table config has no time column defined
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java
similarity index 55%
copy from
pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
copy to
pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java
index 203cc249d7..71ddd12a83 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
+++
b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGeneratorTest.java
@@ -16,23 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.common.utils;
+package org.apache.pinot.segment.spi.creator.name;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.*;
-public class SegmentUtilsTest {
- private static final String SEGMENT = "testSegment";
+public class UploadedRealtimeSegmentNameGeneratorTest {
@Test
- public void testGetSegmentCreationTimeMs() {
- SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(SEGMENT);
- segmentZKMetadata.setCreationTime(1000L);
- assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata),
1000L);
- segmentZKMetadata.setPushTime(2000L);
- assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata),
2000L);
+ public void testGenerateSegmentName() {
+ String tableName = "tableName";
+ int partitionId = 1;
+ long creationTimeMillis = 1234567890L;
+ int sequenceId = 2;
+
+ UploadedRealtimeSegmentNameGenerator generator =
+ new UploadedRealtimeSegmentNameGenerator(tableName, partitionId,
creationTimeMillis, "prefix", "suffix");
+ String expectedSegmentName = "prefix__tableName__1__1234567890__suffix";
+
+ String actualSegmentName = generator.generateSegmentName(sequenceId, null,
null);
+
+ assertEquals(actualSegmentName, expectedSegmentName);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
index 0782b7a8ec..efb11bc633 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfigProperties.java
@@ -74,6 +74,7 @@ public class BatchConfigProperties {
public static final String SIMPLE = "simple";
public static final String NORMALIZED_DATE = "normalizedDate";
public static final String FIXED = "fixed";
+ public static final String UPLOADED_REALTIME = "uploadedRealtime";
public static final String INPUT_FILE = "inputFile";
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]