This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 02f6181 CompletionConfig for realtime tables (#4367)
02f6181 is described below
commit 02f6181e820d2905f3dc20ee7843cef08d0d8ec2
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Jul 8 13:18:13 2019 -0700
CompletionConfig for realtime tables (#4367)
Completion config for realtime tables to specify completion related things
such as completion mode (DOWNLOAD, DEFAULT).
---
.../pinot/common/config/CompletionConfig.java | 63 ++++++++++++++++
.../SegmentsValidationAndRetentionConfig.java | 30 +++++---
.../apache/pinot/common/utils/CommonConstants.java | 8 ++
.../pinot/common/config/TableConfigTest.java | 28 +++++++
.../realtime/LLRealtimeSegmentDataManager.java | 85 +++++++++++++++-------
5 files changed, 179 insertions(+), 35 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/CompletionConfig.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/CompletionConfig.java
new file mode 100644
index 0000000..9eb072d
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/CompletionConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.pinot.common.utils.EqualityUtils;
+
+
+/**
+ * Class representing configurations related to realtime segment completion.
+ *
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CompletionConfig {
+
+ @ConfigKey(value = "completionMode")
+ @ConfigDoc(value = "Mode to use when completing segment. DEFAULT for default
strategy (build segment if segment is equivalent to the committed segment, else
download). DOWNLOAD for always download the segment, never build.", mandatory =
false)
+ private String _completionMode;
+
+ public String getCompletionMode() {
+ return _completionMode;
+ }
+
+ public void setCompletionMode(String completionMode) {
+ _completionMode = completionMode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (EqualityUtils.isSameReference(this, o)) {
+ return true;
+ }
+
+ if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+ return false;
+ }
+
+ CompletionConfig that = (CompletionConfig) o;
+
+ return EqualityUtils.isEqual(_completionMode, that._completionMode);
+ }
+
+ @Override
+ public int hashCode() {
+ return EqualityUtils.hashCodeOf(_completionMode);
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
index e9eeef0..5f917da 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
@@ -66,6 +66,9 @@ public class SegmentsValidationAndRetentionConfig {
private ReplicaGroupStrategyConfig replicaGroupStrategyConfig;
@NestedConfig
+ private CompletionConfig _completionConfig;
+
+ @NestedConfig
private HllConfig hllConfig;
// Number of replicas per partition of low-level consumers. This config is
used for realtime tables only.
@@ -160,6 +163,14 @@ public class SegmentsValidationAndRetentionConfig {
this.replicaGroupStrategyConfig = replicaGroupStrategyConfig;
}
+ public CompletionConfig getCompletionConfig() {
+ return _completionConfig;
+ }
+
+ public void setCompletionConfig(CompletionConfig completionConfig) {
+ _completionConfig = completionConfig;
+ }
+
public HllConfig getHllConfig() {
return hllConfig;
}
@@ -222,15 +233,15 @@ public class SegmentsValidationAndRetentionConfig {
SegmentsValidationAndRetentionConfig that =
(SegmentsValidationAndRetentionConfig) o;
- return EqualityUtils.isEqual(retentionTimeUnit, that.retentionTimeUnit) &&
EqualityUtils
- .isEqual(retentionTimeValue, that.retentionTimeValue) && EqualityUtils
- .isEqual(segmentPushFrequency, that.segmentPushFrequency) &&
EqualityUtils
- .isEqual(segmentPushType, that.segmentPushType) &&
EqualityUtils.isEqual(replication, that.replication)
- && EqualityUtils.isEqual(schemaName, that.schemaName) && EqualityUtils
- .isEqual(timeColumnName, that.timeColumnName) &&
EqualityUtils.isEqual(_timeType, that._timeType)
- && EqualityUtils.isEqual(segmentAssignmentStrategy,
that.segmentAssignmentStrategy) && EqualityUtils
- .isEqual(replicaGroupStrategyConfig, that.replicaGroupStrategyConfig)
&& EqualityUtils
- .isEqual(hllConfig, that.hllConfig) &&
EqualityUtils.isEqual(replicasPerPartition, that.replicasPerPartition);
+ return EqualityUtils.isEqual(retentionTimeUnit, that.retentionTimeUnit) &&
EqualityUtils.isEqual(retentionTimeValue,
+ that.retentionTimeValue) &&
EqualityUtils.isEqual(segmentPushFrequency, that.segmentPushFrequency)
+ && EqualityUtils.isEqual(segmentPushType, that.segmentPushType) &&
EqualityUtils.isEqual(replication,
+ that.replication) && EqualityUtils.isEqual(schemaName,
that.schemaName) && EqualityUtils.isEqual(timeColumnName,
+ that.timeColumnName) && EqualityUtils.isEqual(_timeType,
that._timeType) && EqualityUtils.isEqual(
+ segmentAssignmentStrategy, that.segmentAssignmentStrategy) &&
EqualityUtils.isEqual(replicaGroupStrategyConfig,
+ that.replicaGroupStrategyConfig) &&
EqualityUtils.isEqual(_completionConfig, that._completionConfig)
+ && EqualityUtils.isEqual(hllConfig, that.hllConfig) &&
EqualityUtils.isEqual(replicasPerPartition,
+ that.replicasPerPartition);
}
@Override
@@ -245,6 +256,7 @@ public class SegmentsValidationAndRetentionConfig {
result = EqualityUtils.hashCodeOf(result, _timeType);
result = EqualityUtils.hashCodeOf(result, segmentAssignmentStrategy);
result = EqualityUtils.hashCodeOf(result, replicaGroupStrategyConfig);
+ result = EqualityUtils.hashCodeOf(result, _completionConfig);
result = EqualityUtils.hashCodeOf(result, hllConfig);
result = EqualityUtils.hashCodeOf(result, replicasPerPartition);
return result;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index ff68450..693e331 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -320,6 +320,14 @@ public class CommonConstants {
IN_PROGRESS, DONE
}
+ /**
+ * During realtime segment completion, the value of this enum decides
how non-winner servers should replace the completed segment.
+ */
+ public enum CompletionMode {
+ DEFAULT, // default behavior - if the in memory segment in the
non-winner server is equivalent to the committed segment, then build and
replace, else download
+ DOWNLOAD // non-winner servers always download the segment, never
build it
+ }
+
public static final String STATUS = "segment.realtime.status";
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
index 3821094..94fbe39 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/config/TableConfigTest.java
@@ -252,6 +252,21 @@ public class TableConfigTest {
checkTableConfigWithAssignmentConfig(tableConfig, tableConfigToCompare);
}
{
+ CompletionConfig completionConfig = new CompletionConfig();
+ completionConfig.setCompletionMode("DEFAULT");
+
+ TableConfig tableConfig =
+ tableConfigBuilder.build();
+ tableConfig.getValidationConfig().setCompletionConfig(completionConfig);
+
+ // Serialize then de-serialize
+ TableConfig tableConfigToCompare =
TableConfig.fromJsonConfig(tableConfig.toJsonConfig());
+ checkTableConfigWithCompletionConfig(tableConfig, tableConfigToCompare);
+
+ tableConfigToCompare =
TableConfig.fromZnRecord(tableConfig.toZNRecord());
+ checkTableConfigWithCompletionConfig(tableConfig, tableConfigToCompare);
+ }
+ {
// With default StreamConsumptionConfig
TableConfig tableConfig = tableConfigBuilder.build();
assertEquals(tableConfig.getIndexingConfig().getStreamConsumptionConfig().getStreamPartitionAssignmentStrategy(),
@@ -339,6 +354,19 @@ public class TableConfigTest {
assertEquals(strategyConfig.getPartitionColumn(), "memberId");
}
+ private void checkTableConfigWithCompletionConfig(TableConfig tableConfig,
TableConfig tableConfigToCompare) {
+ // Check that the segment assignment configuration does exist.
+ assertEquals(tableConfigToCompare.getTableName(),
tableConfig.getTableName());
+
assertNotNull(tableConfigToCompare.getValidationConfig().getCompletionConfig());
+
assertEquals(tableConfigToCompare.getValidationConfig().getCompletionConfig(),
+ tableConfig.getValidationConfig().getCompletionConfig());
+
+ // Check that the configurations are correct.
+ CompletionConfig completionConfig =
+ tableConfigToCompare.getValidationConfig().getCompletionConfig();
+ assertEquals(completionConfig.getCompletionMode(), "DEFAULT");
+ }
+
private void checkTableConfigWithStarTreeConfig(TableConfig tableConfig,
TableConfig tableConfigToCompare)
throws Exception {
// Check that the segment assignment configuration does exist.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index e2cb15c..b5c9060 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.config.CompletionConfig;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -48,6 +49,7 @@ import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import
org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.CompletionMode;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -549,12 +551,20 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
break;
case KEEP:
_state = State.RETAINING;
- success = buildSegmentAndReplace();
- if (success) {
- _state = State.RETAINED;
- } else {
- // Could not build segment for some reason. We can only
download it.
- _state = State.ERROR;
+ CompletionMode segmentCompletionMode =
getSegmentCompletionMode();
+ switch (segmentCompletionMode) {
+ case DOWNLOAD:
+ _state = State.DISCARDED;
+ break;
+ case DEFAULT:
+ success = buildSegmentAndReplace();
+ if (success) {
+ _state = State.RETAINED;
+ } else {
+ // Could not build segment for some reason. We can only
download it.
+ _state = State.ERROR;
+ }
+ break;
}
break;
case COMMIT:
@@ -602,6 +612,19 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
}
+ /**
+ * Fetches the completion mode for the segment completion for the given
realtime table
+ */
+ private CompletionMode getSegmentCompletionMode() {
+ CompletionConfig completionConfig =
_tableConfig.getValidationConfig().getCompletionConfig();
+ if (completionConfig != null) {
+ if
(CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode()))
{
+ return CompletionMode.DOWNLOAD;
+ }
+ }
+ return CompletionMode.DEFAULT;
+ }
+
private File makeSegmentDirPath() {
return new File(_resourceDataDir, _segmentZKMetadata.getSegmentName());
}
@@ -912,27 +935,37 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
case CATCHING_UP:
case HOLDING:
case INITIAL_CONSUMING:
- // Allow to catch up upto final offset, and then replace.
- if (_currentOffset > endOffset) {
- // We moved ahead of the offset that is committed in ZK.
- segmentLogger.warn("Current offset {} ahead of the offset in zk
{}. Downloading to replace", _currentOffset,
- endOffset);
- downloadSegmentAndReplace(llcMetadata);
- } else if (_currentOffset == endOffset) {
- segmentLogger
- .info("Current offset {} matches offset in zk {}. Replacing
segment", _currentOffset, endOffset);
- buildSegmentAndReplace();
- } else {
- segmentLogger.info("Attempting to catch up from offset {} to {} ",
_currentOffset, endOffset);
- boolean success = catchupToFinalOffset(endOffset,
-
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS,
TimeUnit.SECONDS));
- if (success) {
- segmentLogger.info("Caught up to offset {}", _currentOffset);
- buildSegmentAndReplace();
- } else {
- segmentLogger.info("Could not catch up to offset (current = {}).
Downloading to replace", _currentOffset);
+ CompletionMode segmentCompletionMode = getSegmentCompletionMode();
+ switch (segmentCompletionMode) {
+ case DOWNLOAD:
+ segmentLogger.info("State {}. CompletionMode {}. Downloading to
replace", _state.toString(),
+ segmentCompletionMode);
downloadSegmentAndReplace(llcMetadata);
- }
+ break;
+ case DEFAULT:
+ // Allow to catch up upto final offset, and then replace.
+ if (_currentOffset > endOffset) {
+ // We moved ahead of the offset that is committed in ZK.
+ segmentLogger.warn("Current offset {} ahead of the offset in
zk {}. Downloading to replace", _currentOffset,
+ endOffset);
+ downloadSegmentAndReplace(llcMetadata);
+ } else if (_currentOffset == endOffset) {
+ segmentLogger.info("Current offset {} matches offset in zk {}.
Replacing segment", _currentOffset,
+ endOffset);
+ buildSegmentAndReplace();
+ } else {
+ segmentLogger.info("Attempting to catch up from offset {} to
{} ", _currentOffset, endOffset);
+ boolean success = catchupToFinalOffset(endOffset,
+
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS,
TimeUnit.SECONDS));
+ if (success) {
+ segmentLogger.info("Caught up to offset {}", _currentOffset);
+ buildSegmentAndReplace();
+ } else {
+ segmentLogger.info("Could not catch up to offset (current =
{}). Downloading to replace", _currentOffset);
+ downloadSegmentAndReplace(llcMetadata);
+ }
+ }
+ break;
}
break;
default:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]