This is an automated email from the ASF dual-hosted git repository.
yupeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7add772 Add option to store the hash of the upsert primary key (#7246)
7add772 is described below
commit 7add77202e543f0932db6c0d8557cab9baa95bc4
Author: Yupeng Fu <[email protected]>
AuthorDate: Tue Aug 10 11:42:49 2021 -0700
Add option to store the hash of the upsert primary key (#7246)
* add option to store the hash of the upsert primary key
* use byte equality
* fix ut
* fix
* fix integration tests
* add header
* comments
* comments
* comments
* fix ut
* comments
---
.../common/utils/config/TableConfigSerDeTest.java | 3 +-
.../controller/helix/PinotResourceManagerTest.java | 2 +-
.../realtime/LLRealtimeSegmentDataManager.java | 1 +
.../manager/realtime/RealtimeTableDataManager.java | 3 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 6 +-
.../tests/BaseClusterIntegrationTest.java | 2 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 5 +-
.../local/realtime/impl/RealtimeSegmentConfig.java | 19 ++-
.../upsert/PartitionUpsertMetadataManager.java | 28 +++-
.../local/upsert/TableUpsertMetadataManager.java | 8 +-
.../pinot/segment/local/utils/HashUtils.java | 25 ++--
.../mutable/MutableSegmentImplTestUtils.java | 7 +-
.../MutableSegmentImplUpsertComparisonColTest.java | 7 +-
.../mutable/MutableSegmentImplUpsertTest.java | 32 +++--
.../upsert/PartitionUpsertMetadataManagerTest.java | 148 +++++++++++++--------
.../pinot/segment/local/utils/HashUtilsTest.java | 24 ++--
.../segment/local/utils/TableConfigUtilsTest.java | 14 +-
.../apache/pinot/spi/config/table/TableConfig.java | 5 +
.../pinot/spi/config/table/UpsertConfig.java | 15 ++-
.../apache/pinot/spi/data/readers/PrimaryKey.java | 5 +
.../java/org/apache/pinot/spi/utils/ByteArray.java | 3 +-
.../pinot/spi/config/table/UpsertConfigTest.java | 9 +-
.../pinot/spi/data/readers/PrimaryKeyTest.java | 11 ++
23 files changed, 253 insertions(+), 129 deletions(-)
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 08b364c..cb48ef5 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -246,7 +246,8 @@ public class TableConfigSerDeTest {
}
{
// with upsert config
- UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL,
null, "comparison");
+ UpsertConfig upsertConfig =
+ new UpsertConfig(UpsertConfig.Mode.FULL, null, "comparison",
UpsertConfig.HashFunction.NONE);
TableConfig tableConfig =
tableConfigBuilder.setUpsertConfig(upsertConfig).build();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 58f993a..939cfe9 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -68,7 +68,7 @@ public class PinotResourceManagerTest {
realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING);
realtimeTableConfig.getValidationConfig()
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1));
- realtimeTableConfig.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL, null, null));
+ realtimeTableConfig.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL, null, null, null));
ControllerTestUtils.getHelixResourceManager().addTable(realtimeTableConfig);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 31b4151..2a9e4b5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1260,6 +1260,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
.setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
+ .setHashFunction(tableConfig.getHashFunction())
.setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn());
// Create message decoder
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 64fdb4d..1aae911 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -168,8 +168,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
partialUpsertHandler =
new PartialUpsertHandler(_helixManager, _tableNameWithType,
upsertConfig.getPartialUpsertStrategies());
}
+ UpsertConfig.HashFunction hashFunction = upsertConfig.getHashFunction();
_tableUpsertMetadataManager =
- new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics,
partialUpsertHandler);
+ new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics,
partialUpsertHandler, hashFunction);
_primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(!CollectionUtils.isEmpty(_primaryKeyColumns),
"Primary key columns must be configured for upsert");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 956ad31..b691773 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -44,6 +44,7 @@ import
org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
@@ -123,8 +124,9 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.heap);
ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.heap);
- ((ImmutableSegmentImpl) _upsertIndexSegment)
- .enableUpsert(new PartitionUpsertMetadataManager("testTable_REALTIME",
0, serverMetrics, null),
+ ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
+ new PartitionUpsertMetadataManager("testTable_REALTIME", 0,
serverMetrics, null,
+ UpsertConfig.HashFunction.NONE),
new ThreadSafeMutableRoaringBitmap());
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 50da64b..de4ff03 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -386,7 +386,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null)).build();
}
/**
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 73c6d4d..9cd4153 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -512,8 +512,9 @@ public class MutableSegmentImpl implements MutableSegment {
private GenericRow handleUpsert(GenericRow row, int docId) {
PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
- Preconditions.checkState(upsertComparisonValue instanceof Comparable,
- "Upsert comparison column: %s must be comparable",
_upsertComparisonColumn);
+ Preconditions
+ .checkState(upsertComparisonValue instanceof Comparable, "Upsert
comparison column: %s must be comparable",
+ _upsertComparisonColumn);
return _partitionUpsertMetadataManager.updateRecord(this,
new PartitionUpsertMetadataManager.RecordInfo(primaryKey, docId,
(Comparable) upsertComparisonValue), row);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 43f9343..0a1252f 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -57,6 +57,7 @@ public class RealtimeSegmentConfig {
private final boolean _aggregateMetrics;
private final boolean _nullHandlingEnabled;
private final UpsertConfig.Mode _upsertMode;
+ private final UpsertConfig.HashFunction _hashFunction;
private final String _upsertComparisonColumn;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final String _consumerDir;
@@ -69,7 +70,7 @@ public class RealtimeSegmentConfig {
RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap,
PinotDataBufferMemoryManager memoryManager,
RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction,
int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir,
- UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
+ UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
UpsertConfig.HashFunction hashFunction,
PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
_tableNameWithType = tableNameWithType;
_segmentName = segmentName;
@@ -96,6 +97,7 @@ public class RealtimeSegmentConfig {
_nullHandlingEnabled = nullHandlingEnabled;
_consumerDir = consumerDir;
_upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
+ _hashFunction = hashFunction != null ? hashFunction :
UpsertConfig.HashFunction.NONE;
_upsertComparisonColumn = upsertComparisonColumn;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
}
@@ -205,6 +207,10 @@ public class RealtimeSegmentConfig {
return _upsertMode;
}
+ public UpsertConfig.HashFunction getHashFunction() {
+ return _hashFunction;
+ }
+
public String getUpsertComparisonColumn() {
return _upsertComparisonColumn;
}
@@ -239,6 +245,7 @@ public class RealtimeSegmentConfig {
private boolean _nullHandlingEnabled = false;
private String _consumerDir;
private UpsertConfig.Mode _upsertMode;
+ private UpsertConfig.HashFunction _hashFunction;
private String _upsertComparisonColumn;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
@@ -378,11 +385,16 @@ public class RealtimeSegmentConfig {
return this;
}
+ public Builder setHashFunction(UpsertConfig.HashFunction hashFunction) {
+ _hashFunction = hashFunction;
+ return this;
+ }
+
public Builder setUpsertComparisonColumn(String upsertComparisonColumn) {
_upsertComparisonColumn = upsertComparisonColumn;
return this;
}
-
+
public Builder
setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager
partitionUpsertMetadataManager) {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
return this;
@@ -393,7 +405,8 @@ public class RealtimeSegmentConfig {
_capacity, _avgNumMultiValues, _noDictionaryColumns,
_varLengthDictionaryColumns, _invertedIndexColumns,
_textIndexColumns, _fstIndexColumns, _jsonIndexColumns,
_h3IndexConfigs, _realtimeSegmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction,
_partitionId, _aggregateMetrics,
- _nullHandlingEnabled, _consumerDir, _upsertMode,
_upsertComparisonColumn, _partitionUpsertMetadataManager);
+ _nullHandlingEnabled, _consumerDir, _upsertMode,
_upsertComparisonColumn, _hashFunction,
+ _partitionUpsertMetadataManager);
}
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 6bd9d1b..64d3192 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -26,10 +26,13 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,10 +69,11 @@ public class PartitionUpsertMetadataManager {
private final int _partitionId;
private final ServerMetrics _serverMetrics;
private final PartialUpsertHandler _partialUpsertHandler;
+ private final UpsertConfig.HashFunction _hashFunction;
// TODO(upsert): consider an off-heap KV store to persist this mapping to
improve the recovery speed.
@VisibleForTesting
- final ConcurrentHashMap<PrimaryKey, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<Object, RecordLocation>
_primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
// Reused for reading previous record during partial upsert
private final GenericRow _reuse = new GenericRow();
@@ -77,11 +81,12 @@ public class PartitionUpsertMetadataManager {
private GenericRow _result;
public PartitionUpsertMetadataManager(String tableNameWithType, int
partitionId, ServerMetrics serverMetrics,
- @Nullable PartialUpsertHandler partialUpsertHandler) {
+ @Nullable PartialUpsertHandler partialUpsertHandler,
UpsertConfig.HashFunction hashFunction) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_serverMetrics = serverMetrics;
_partialUpsertHandler = partialUpsertHandler;
+ _hashFunction = hashFunction;
}
/**
@@ -95,7 +100,8 @@ public class PartitionUpsertMetadataManager {
while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
- _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey,
(primaryKey, currentRecordLocation) -> {
+ _primaryKeyToRecordLocationMap
+ .compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
@@ -176,7 +182,8 @@ public class PartitionUpsertMetadataManager {
}
_result = record;
- _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey,
(primaryKey, currentRecordLocation) -> {
+ _primaryKeyToRecordLocationMap
+ .compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
@@ -238,6 +245,19 @@ public class PartitionUpsertMetadataManager {
_primaryKeyToRecordLocationMap.size());
}
+ protected static Object hashPrimaryKey(PrimaryKey primaryKey,
UpsertConfig.HashFunction hashFunction) {
+ switch (hashFunction) {
+ case NONE:
+ return primaryKey;
+ case MD5:
+ return new ByteArray(HashUtils.hashMD5(primaryKey.asBytes()));
+ case MURMUR3:
+ return new ByteArray(HashUtils.hashMurmur3(primaryKey.asBytes()));
+ default:
+ throw new IllegalArgumentException(String.format("Unrecognized hash
function %s", hashFunction));
+ }
+ }
+
public static final class RecordInfo {
private final PrimaryKey _primaryKey;
private final int _docId;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index ae8fb3f..d78cfc0 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.UpsertConfig;
/**
@@ -34,16 +35,19 @@ public class TableUpsertMetadataManager {
private final String _tableNameWithType;
private final ServerMetrics _serverMetrics;
private final PartialUpsertHandler _partialUpsertHandler;
+ private final UpsertConfig.HashFunction _hashFunction;
public TableUpsertMetadataManager(String tableNameWithType, ServerMetrics
serverMetrics,
- @Nullable PartialUpsertHandler partialUpsertHandler) {
+ @Nullable PartialUpsertHandler partialUpsertHandler,
UpsertConfig.HashFunction hashFunction) {
_tableNameWithType = tableNameWithType;
_serverMetrics = serverMetrics;
_partialUpsertHandler = partialUpsertHandler;
+ _hashFunction = hashFunction;
}
public PartitionUpsertMetadataManager getOrCreatePartitionManager(int
partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
- k -> new PartitionUpsertMetadataManager(_tableNameWithType, k,
_serverMetrics, _partialUpsertHandler));
+ k -> new PartitionUpsertMetadataManager(_tableNameWithType, k,
_serverMetrics, _partialUpsertHandler,
+ _hashFunction));
}
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java
similarity index 57%
copy from
pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
copy to
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java
index 97faad8..dfd9a5c 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/HashUtils.java
@@ -16,25 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.data.readers;
+package org.apache.pinot.segment.local.utils;
-import org.testng.annotations.Test;
+import com.google.common.hash.Hashing;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
+public class HashUtils {
+ private HashUtils() {
+ }
-public class PrimaryKeyTest {
-
- @Test
- public void testPrimaryKeyComparison() {
- PrimaryKey left = new PrimaryKey(new Object[]{"111", 2});
- PrimaryKey right = new PrimaryKey(new Object[]{"111", 2});
- assertEquals(left, right);
- assertEquals(left.hashCode(), right.hashCode());
+ public static byte[] hashMurmur3(byte[] bytes) {
+ return Hashing.murmur3_128().hashBytes(bytes).asBytes();
+ }
- right = new PrimaryKey(new Object[]{"222", 2});
- assertNotEquals(left, right);
- assertNotEquals(left.hashCode(), right.hashCode());
+ public static byte[] hashMD5(byte[] bytes) {
+ return Hashing.md5().hashBytes(bytes).asBytes();
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 4f52009..e88ec3c 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -63,6 +63,8 @@ public class MutableSegmentImplTestUtils {
UpsertConfig.Mode upsertMode = upsertConfig == null ?
UpsertConfig.Mode.NONE : upsertConfig.getMode();
String comparisonColumn = upsertConfig == null ? null :
upsertConfig.getComparisonColumn();
+ UpsertConfig.HashFunction hashFunction =
+ upsertConfig == null ? UpsertConfig.HashFunction.NONE :
upsertConfig.getHashFunction();
RealtimeSegmentConfig realtimeSegmentConfig =
new
RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
.setStreamName(STEAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
@@ -71,9 +73,8 @@ public class MutableSegmentImplTestUtils {
.setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
.setMemoryManager(new
DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
- .setUpsertComparisonColumn(comparisonColumn)
- .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
- .build();
+
.setUpsertComparisonColumn(comparisonColumn).setHashFunction(hashFunction)
+
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).build();
return new MutableSegmentImpl(realtimeSegmentConfig, null);
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index b7ae126..5a08556 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -57,15 +57,16 @@ public class MutableSegmentImplUpsertComparisonColTest {
URL dataResourceUrl =
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
"offset")).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
"offset", null)).build();
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
- new TableUpsertMetadataManager("testTable_REALTIME",
Mockito.mock(ServerMetrics.class), null)
+ new TableUpsertMetadataManager("testTable_REALTIME",
Mockito.mock(ServerMetrics.class), null,
+ UpsertConfig.HashFunction.NONE)
.getOrCreatePartitionManager(0);
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
- false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null,
"offset"), "secondsSinceEpoch",
+ false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null,
"offset", null), "secondsSinceEpoch",
_partitionUpsertMetadataManager);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = RecordReaderFactory
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index 2b367cd..d62a6ed 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -37,35 +37,34 @@ import
org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class MutableSegmentImplUpsertTest {
private static final String SCHEMA_FILE_PATH =
"data/test_upsert_schema.json";
private static final String DATA_FILE_PATH = "data/test_upsert_data.json";
- private static CompositeTransformer _recordTransformer;
- private static Schema _schema;
- private static TableConfig _tableConfig;
- private static MutableSegmentImpl _mutableSegmentImpl;
- private static PartitionUpsertMetadataManager
_partitionUpsertMetadataManager;
+ private CompositeTransformer _recordTransformer;
+ private Schema _schema;
+ private TableConfig _tableConfig;
+ private MutableSegmentImpl _mutableSegmentImpl;
+ private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
- @BeforeClass
- public void setup()
+ private void setup(UpsertConfig.HashFunction hashFunction)
throws Exception {
URL schemaResourceUrl =
this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl =
this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName("testTable")
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
hashFunction)).build();
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
_partitionUpsertMetadataManager =
- new TableUpsertMetadataManager("testTable_REALTIME",
Mockito.mock(ServerMetrics.class), null)
+ new TableUpsertMetadataManager("testTable_REALTIME",
Mockito.mock(ServerMetrics.class), null,
+ hashFunction)
.getOrCreatePartitionManager(0);
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(_schema, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
- false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null),
"secondsSinceEpoch",
+ false, true, new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
hashFunction), "secondsSinceEpoch",
_partitionUpsertMetadataManager);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = RecordReaderFactory
@@ -80,7 +79,16 @@ public class MutableSegmentImplUpsertTest {
}
@Test
- public void testUpsertIngestion() {
+ public void testHashFunctions()
+ throws Exception {
+ testUpsertIngestion(UpsertConfig.HashFunction.NONE);
+ testUpsertIngestion(UpsertConfig.HashFunction.MD5);
+ testUpsertIngestion(UpsertConfig.HashFunction.MURMUR3);
+ }
+
+ private void testUpsertIngestion(UpsertConfig.HashFunction hashFunction)
+ throws Exception {
+ setup(hashFunction);
ImmutableRoaringBitmap bitmap =
_mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
Assert.assertFalse(bitmap.contains(0));
Assert.assertTrue(bitmap.contains(1));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
index 6812d63..ce91f01 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
@@ -26,9 +26,13 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -45,9 +49,15 @@ public class PartitionUpsertMetadataManagerTest {
@Test
public void testAddSegment() {
+ verifyAddSegment(UpsertConfig.HashFunction.NONE);
+ verifyAddSegment(UpsertConfig.HashFunction.MD5);
+ verifyAddSegment(UpsertConfig.HashFunction.MURMUR3);
+ }
+
+ private void verifyAddSegment(UpsertConfig.HashFunction hashFunction) {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null);
- Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null, hashFunction);
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
List<PartitionUpsertMetadataManager.RecordInfo> recordInfoList1 = new
ArrayList<>();
@@ -61,9 +71,9 @@ public class PartitionUpsertMetadataManagerTest {
ImmutableSegmentImpl segment1 = mockSegment(1, validDocIds1);
upsertMetadataManager.addSegment(segment1, recordInfoList1.iterator());
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 5, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{2, 4, 5});
// Add the second segment
@@ -78,10 +88,10 @@ public class PartitionUpsertMetadataManagerTest {
upsertMetadataManager.addSegment(segment2, recordInfoList2.iterator());
// segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 4, 120);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
@@ -92,26 +102,28 @@ public class PartitionUpsertMetadataManagerTest {
// original segment1: 1 -> {4, 120}
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
- assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(),
newSegment1);
+
assertSame(recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(1),
hashFunction))
+ .getSegment(), newSegment1);
// Remove the original segment1
upsertMetadataManager.removeSegment(segment1);
// segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
// new segment1: 1 -> {4, 120}
- checkRecordLocation(recordLocationMap, 0, segment2, 0, 100);
- checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120);
- checkRecordLocation(recordLocationMap, 2, segment2, 2, 120);
- checkRecordLocation(recordLocationMap, 3, segment2, 3, 80);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120,
hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 2, 3});
assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{4});
- assertSame(recordLocationMap.get(getPrimaryKey(1)).getSegment(),
newSegment1);
+
assertSame(recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(1),
hashFunction))
+ .getSegment(), newSegment1);
}
private static ImmutableSegmentImpl mockSegment(int sequenceNumber,
ThreadSafeMutableRoaringBitmap validDocIds) {
@@ -130,9 +142,10 @@ public class PartitionUpsertMetadataManagerTest {
return new PrimaryKey(new Object[]{value});
}
- private static void checkRecordLocation(Map<PrimaryKey, RecordLocation>
recordLocationMap, int keyValue,
- IndexSegment segment, int docId, int timestamp) {
- RecordLocation recordLocation =
recordLocationMap.get(getPrimaryKey(keyValue));
+ private static void checkRecordLocation(Map<Object, RecordLocation>
recordLocationMap, int keyValue,
+ IndexSegment segment, int docId, int timestamp,
UpsertConfig.HashFunction hashFunction) {
+ RecordLocation recordLocation =
+
recordLocationMap.get(PartitionUpsertMetadataManager.hashPrimaryKey(getPrimaryKey(keyValue),
hashFunction));
assertNotNull(recordLocation);
assertSame(recordLocation.getSegment(), segment);
assertEquals(recordLocation.getDocId(), docId);
@@ -141,9 +154,15 @@ public class PartitionUpsertMetadataManagerTest {
@Test
public void testUpdateRecord() {
+ verifyUpdateRecord(UpsertConfig.HashFunction.NONE);
+ verifyUpdateRecord(UpsertConfig.HashFunction.MD5);
+ verifyUpdateRecord(UpsertConfig.HashFunction.MURMUR3);
+ }
+
+ private void verifyUpdateRecord(UpsertConfig.HashFunction hashFunction) {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null);
- Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null, hashFunction);
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
@@ -160,56 +179,62 @@ public class PartitionUpsertMetadataManagerTest {
IndexSegment segment2 = mockSegment(1, validDocIds2);
GenericRow row = mock(GenericRow.class);
- upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0, 100), row);
+ upsertMetadataManager.updateRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(3), 0,
100), row);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
- checkRecordLocation(recordLocationMap, 2, segment1, 2, 100);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0});
- upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1, 120), row);
+ upsertMetadataManager.updateRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(2), 1,
120), row);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
- upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2, 100), row);
+ upsertMetadataManager.updateRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(1), 2,
100), row);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment1, 0, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
- upsertMetadataManager
- .updateRecord(segment2, new
PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3, 100), row);
+ upsertMetadataManager.updateRecord(segment2,
+ new PartitionUpsertMetadataManager.RecordInfo(getPrimaryKey(0), 3,
100), row);
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
- checkRecordLocation(recordLocationMap, 0, segment2, 3, 100);
- checkRecordLocation(recordLocationMap, 1, segment1, 1, 120);
- checkRecordLocation(recordLocationMap, 2, segment2, 1, 120);
- checkRecordLocation(recordLocationMap, 3, segment2, 0, 100);
+ checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new
int[]{1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1, 3});
}
@Test
public void testRemoveSegment() {
+ verifyRemoveSegment(UpsertConfig.HashFunction.NONE);
+ verifyRemoveSegment(UpsertConfig.HashFunction.MD5);
+ verifyRemoveSegment(UpsertConfig.HashFunction.MURMUR3);
+ }
+
+ private void verifyRemoveSegment(UpsertConfig.HashFunction hashFunction) {
PartitionUpsertMetadataManager upsertMetadataManager =
- new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null);
- Map<PrimaryKey, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
+ new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
mock(ServerMetrics.class), null, hashFunction);
+ Map<Object, RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add 2 segments
// segment1: 0 -> {0, 100}, 1 -> {1, 100}
@@ -232,8 +257,27 @@ public class PartitionUpsertMetadataManagerTest {
// segment2: 2 -> {0, 100}, 3 -> {0, 100}
assertNull(recordLocationMap.get(getPrimaryKey(0)));
assertNull(recordLocationMap.get(getPrimaryKey(1)));
- checkRecordLocation(recordLocationMap, 2, segment2, 0, 100);
- checkRecordLocation(recordLocationMap, 3, segment2, 1, 100);
+ checkRecordLocation(recordLocationMap, 2, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 1, 100, hashFunction);
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new
int[]{0, 1});
}
+
+ @Test
+ public void testHashPrimaryKey() {
+ PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
+ Assert.assertEquals(BytesUtils.toHexString(
+ ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk,
UpsertConfig.HashFunction.MD5)).getBytes()),
+ "58de44997505014e02982846a4d1cbbd");
+ Assert.assertEquals(BytesUtils.toHexString(
+ ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk,
UpsertConfig.HashFunction.MURMUR3)).getBytes()),
+ "7e6b4a98296292a4012225fff037fa8c");
+ // reorder
+ pk = new PrimaryKey(new Object[]{"uuid-3", "uuid-2", "uuid-1"});
+ Assert.assertEquals(BytesUtils.toHexString(
+ ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk,
UpsertConfig.HashFunction.MD5)).getBytes()),
+ "d2df12c6dea7b83f965613614eee58e2");
+ Assert.assertEquals(BytesUtils.toHexString(
+ ((ByteArray) PartitionUpsertMetadataManager.hashPrimaryKey(pk,
UpsertConfig.HashFunction.MURMUR3)).getBytes()),
+ "8d68b314cc0c8de4dbd55f4dad3c3e66");
+ }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java
similarity index 60%
copy from
pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
copy to
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java
index 97faad8..e704cb1 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/HashUtilsTest.java
@@ -16,25 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.data.readers;
+package org.apache.pinot.segment.local.utils;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-
-
-public class PrimaryKeyTest {
+public class HashUtilsTest {
@Test
- public void testPrimaryKeyComparison() {
- PrimaryKey left = new PrimaryKey(new Object[]{"111", 2});
- PrimaryKey right = new PrimaryKey(new Object[]{"111", 2});
- assertEquals(left, right);
- assertEquals(left.hashCode(), right.hashCode());
-
- right = new PrimaryKey(new Object[]{"222", 2});
- assertNotEquals(left, right);
- assertNotEquals(left.hashCode(), right.hashCode());
+ public void testHashPlainValues() {
+ Assert.assertEquals(BytesUtils.toHexString(HashUtils.hashMD5("hello
world".getBytes())),
+ "5eb63bbbe01eeed093cb22bb8f5acdc3");
+ Assert.assertEquals(BytesUtils.toHexString(HashUtils.hashMurmur3("hello
world".getBytes())),
+ "0e617feb46603f53b163eb607d4697ab");
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index fc0888a..c028069 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1036,7 +1036,7 @@ public class TableConfigUtilsTest {
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
.build();
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null)).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1045,7 +1045,7 @@ public class TableConfigUtilsTest {
}
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
null)).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null)).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1066,7 +1066,7 @@ public class TableConfigUtilsTest {
Map<String, String> streamConfigs = getStreamConfigs();
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
null)).setStreamConfigs(streamConfigs).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1076,7 +1076,7 @@ public class TableConfigUtilsTest {
streamConfigs.put("stream.kafka.consumer.type", "simple");
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null,
null)).setStreamConfigs(streamConfigs).build();
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
Assert.fail();
@@ -1086,7 +1086,7 @@ public class TableConfigUtilsTest {
}
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null))
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null))
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
TableConfigUtils.validateUpsertConfig(tableConfig, schema);
@@ -1094,7 +1094,7 @@ public class TableConfigUtilsTest {
StarTreeIndexConfig starTreeIndexConfig = new
StarTreeIndexConfig(Lists.newArrayList("myCol"), null, Collections
.singletonList(new
AggregationFunctionColumnPair(AggregationFunctionType.COUNT,
"myCol").toColumnName()), 10);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null))
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL, null, null,
null))
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
try {
@@ -1119,7 +1119,7 @@ public class TableConfigUtilsTest {
partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies, null))
+ .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies, null, null))
.setNullHandlingEnabled(false)
.setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setStreamConfigs(streamConfigs).build();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 3156c2d..2c3c597 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -286,6 +286,11 @@ public class TableConfig extends BaseJsonConfig {
}
@JsonIgnore
+ public UpsertConfig.HashFunction getHashFunction() {
+ return _upsertConfig == null ? UpsertConfig.HashFunction.NONE :
_upsertConfig.getHashFunction();
+ }
+
+ @JsonIgnore
public UpsertConfig.Mode getUpsertMode() {
return _upsertConfig == null ? UpsertConfig.Mode.NONE :
_upsertConfig.getMode();
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 3ad2b36..7e0be94 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -40,9 +40,16 @@ public class UpsertConfig extends BaseJsonConfig {
APPEND, INCREMENT, OVERWRITE, UNION
}
+ public enum HashFunction {
+ NONE, MD5, MURMUR3
+ }
+
@JsonPropertyDescription("Upsert mode.")
private final Mode _mode;
+ @JsonPropertyDescription("Function to hash the primary key.")
+ private final HashFunction _hashFunction;
+
@JsonPropertyDescription("Partial update strategies.")
private final Map<String, Strategy> _partialUpsertStrategies;
@@ -52,7 +59,8 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonCreator
public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
@JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy>
partialUpsertStrategies,
- @JsonProperty("comparisonColumn") @Nullable String comparisonColumn) {
+ @JsonProperty("comparisonColumn") @Nullable String comparisonColumn,
+ @JsonProperty("hashFunction") @Nullable HashFunction hashFunction) {
Preconditions.checkArgument(mode != null, "Upsert mode must be
configured");
_mode = mode;
@@ -63,12 +71,17 @@ public class UpsertConfig extends BaseJsonConfig {
}
_comparisonColumn = comparisonColumn;
+ _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
}
public Mode getMode() {
return _mode;
}
+ public HashFunction getHashFunction() {
+ return _hashFunction;
+ }
+
@Nullable
public Map<String, Strategy> getPartialUpsertStrategies() {
return _partialUpsertStrategies;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
index c6a2edf..b663cf6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/PrimaryKey.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.data.readers;
import java.util.Arrays;
+import org.apache.commons.lang3.SerializationUtils;
/**
@@ -35,6 +36,10 @@ public class PrimaryKey {
return _values;
}
+ public byte[] asBytes() {
+ return SerializationUtils.serialize(_values);
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
index 95cfd6e..6189025 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ByteArray.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.utils;
+import java.io.Serializable;
import java.util.Arrays;
import javax.annotation.Nonnull;
@@ -29,7 +30,7 @@ import javax.annotation.Nonnull;
* <li> Implements equals() and hashCode(), so it can be used as key for
HashMap/Set. </li>
* </ul>
*/
-public class ByteArray implements Comparable<ByteArray> {
+public class ByteArray implements Comparable<ByteArray>, Serializable {
private final byte[] _bytes;
public ByteArray(byte[] bytes) {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index ab071f7..37b18a2 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -29,15 +29,18 @@ public class UpsertConfigTest {
@Test
public void testUpsertConfig() {
- UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL,
null, null);
+ UpsertConfig upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL,
null, null, null);
assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
- upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null,
"comparison");
+ upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null,
"comparison", null);
assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
+ upsertConfig1 = new UpsertConfig(UpsertConfig.Mode.FULL, null,
"comparison", UpsertConfig.HashFunction.MURMUR3);
+ assertEquals(upsertConfig1.getHashFunction(),
UpsertConfig.HashFunction.MURMUR3);
+
Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new
HashMap<>();
partialUpsertStratgies.put("myCol", UpsertConfig.Strategy.INCREMENT);
- UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies, null);
+ UpsertConfig upsertConfig2 = new UpsertConfig(UpsertConfig.Mode.PARTIAL,
partialUpsertStratgies, null, null);
assertEquals(upsertConfig2.getPartialUpsertStrategies(),
partialUpsertStratgies);
}
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
index 97faad8..e1c531d 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/PrimaryKeyTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.spi.data.readers;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.pinot.spi.utils.ByteArray;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -37,4 +39,13 @@ public class PrimaryKeyTest {
assertNotEquals(left, right);
assertNotEquals(left.hashCode(), right.hashCode());
}
+
+ @Test
+ public void testSerialization() {
+ byte[] rawbytes = {0xa, 0x2, (byte) 0xff};
+ PrimaryKey pk = new PrimaryKey(new Object[]{"111", 2, new
ByteArray(rawbytes)});
+ byte[] bytes = pk.asBytes();
+ PrimaryKey deserialized = new PrimaryKey((Object[])
SerializationUtils.deserialize(bytes));
+ assertEquals(deserialized, pk);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]