This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f6f59ebedf Fix issues for realtime table reload (#9885)
f6f59ebedf is described below
commit f6f59ebedf8386fddb3e6895cc7c078acf64b43f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Dec 1 10:36:14 2022 -0800
Fix issues for realtime table reload (#9885)
---
.../manager/realtime/RealtimeTableDataManager.java | 10 +++--
.../query/executor/ServerQueryExecutorV1Impl.java | 49 +++++++++++-----------
.../tests/LLCRealtimeClusterIntegrationTest.java | 7 +++-
.../segment/index/loader/IndexLoadingConfig.java | 8 ++--
4 files changed, 40 insertions(+), 34 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 2c616c438c..6d5bfe1de8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -538,10 +538,14 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
* Replaces a committed LLC REALTIME segment.
*/
public void replaceLLSegment(String segmentName, IndexLoadingConfig
indexLoadingConfig) {
+ File indexDir = new File(_indexDir, segmentName);
+ // Use the latest table config and schema to load the segment
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to get table config
for table: {}", _tableNameWithType);
+ Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
tableConfig);
+ indexLoadingConfig.updateTableConfigAndSchema(tableConfig, schema);
try {
- File indexDir = new File(_indexDir, segmentName);
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
schema));
+ addSegment(indexDir, indexLoadingConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 83ded8ef3f..803621d52a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -81,7 +81,6 @@ import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,40 +209,38 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
// Gather stats for realtime consuming segments
+ // TODO: the freshness time should not be collected at query time because
there is no guarantee that the consuming
+ // segment is queried (consuming segment might be pruned, or the
server only contains relocated committed
+ // segments)
int numConsumingSegmentsQueried = 0;
- int numOnlineSegments = 0;
long minIndexTimeMs = 0;
long minIngestionTimeMs = 0;
long maxEndTimeMs = 0;
if (tableDataManager instanceof RealtimeTableDataManager) {
- numConsumingSegmentsQueried = 0;
- numOnlineSegments = 0;
minIndexTimeMs = Long.MAX_VALUE;
minIngestionTimeMs = Long.MAX_VALUE;
maxEndTimeMs = Long.MIN_VALUE;
for (IndexSegment indexSegment : indexSegments) {
+ SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
if (indexSegment instanceof MutableSegment) {
numConsumingSegmentsQueried += 1;
- SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
- if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
- minIndexTimeMs = indexTimeMs;
+ if (indexTimeMs > 0) {
+ minIndexTimeMs = Math.min(minIndexTimeMs, indexTimeMs);
}
long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
- if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs <
minIngestionTimeMs) {
- minIngestionTimeMs = ingestionTimeMs;
+ if (ingestionTimeMs > 0) {
+ minIngestionTimeMs = Math.min(minIngestionTimeMs, ingestionTimeMs);
}
} else if (indexSegment instanceof ImmutableSegment) {
- SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
long indexCreationTime = segmentMetadata.getIndexCreationTime();
- numOnlineSegments++;
- if (indexCreationTime != Long.MIN_VALUE) {
+ if (indexCreationTime > 0) {
maxEndTimeMs = Math.max(maxEndTimeMs, indexCreationTime);
} else {
// NOTE: the endTime may be totally inaccurate based on the value
added in the timeColumn
- Interval timeInterval = segmentMetadata.getTimeInterval();
- if (timeInterval != null) {
- maxEndTimeMs = Math.max(maxEndTimeMs,
timeInterval.getEndMillis());
+ long endTime = segmentMetadata.getEndTime();
+ if (endTime > 0) {
+ maxEndTimeMs = Math.max(maxEndTimeMs, endTime);
}
}
}
@@ -314,22 +311,24 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
}
if (tableDataManager instanceof RealtimeTableDataManager) {
- long minConsumingFreshnessTimeMs;
if (numConsumingSegmentsQueried > 0) {
- minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE ?
minIngestionTimeMs : minIndexTimeMs;
instanceResponse.addMetadata(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
Integer.toString(numConsumingSegmentsQueried));
-
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
- Long.toString(minConsumingFreshnessTimeMs));
- LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
- numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
- } else if (numConsumingSegmentsQueried == 0 && maxEndTimeMs !=
Long.MIN_VALUE) {
+ }
+ long minConsumingFreshnessTimeMs = 0;
+ if (minIngestionTimeMs != Long.MAX_VALUE) {
+ minConsumingFreshnessTimeMs = minIndexTimeMs;
+ } else if (minIndexTimeMs != Long.MAX_VALUE) {
+ minConsumingFreshnessTimeMs = minIndexTimeMs;
+ } else if (maxEndTimeMs != Long.MIN_VALUE) {
minConsumingFreshnessTimeMs = maxEndTimeMs;
+ }
+ if (minConsumingFreshnessTimeMs > 0) {
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
- Long.toString(maxEndTimeMs));
- LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
- numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
+ Long.toString(minConsumingFreshnessTimeMs));
}
+ LOGGER.debug("Request {} queried {} consuming segments with
minConsumingFreshnessTimeMs: {}", requestId,
+ numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
}
LOGGER.debug("Query processing time for request Id - {}: {}", requestId,
queryProcessingTime);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 5c589bd9cb..69fab5c183 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -60,8 +60,9 @@ import static org.testng.Assert.assertTrue;
*/
public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+ // NOTE: The test query should match all the segments so that all the
segments are guaranteed to be reloaded
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
- "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+ "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = -9999";
private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
Collections.singletonList("DivActualElapsedTime");
private static final long RANDOM_SEED = System.currentTimeMillis();
private static final Random RANDOM = new Random(RANDOM_SEED);
@@ -237,6 +238,7 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
assertTrue(queryResponse.get("numEntriesScannedInFilter").asLong() > 0L);
+ long result =
queryResponse.get("resultTable").get("rows").get(0).get(0).asLong();
TableConfig tableConfig = getRealtimeTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
@@ -246,7 +248,8 @@ public class LLCRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegr
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
- // Total docs should not change during reload
+ // Query result and total docs should not change during reload
+
assertEquals(queryResponse1.get("resultTable").get("rows").get(0).get(0).asLong(),
result);
assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
long numConsumingSegmentsQueried =
queryResponse1.get("numConsumingSegmentsQueried").asLong();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 704d8f02c7..231e43408c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -105,7 +105,7 @@ public class IndexLoadingConfig {
public IndexLoadingConfig(InstanceDataManagerConfig
instanceDataManagerConfig, TableConfig tableConfig,
@Nullable Schema schema) {
extractFromInstanceConfig(instanceDataManagerConfig);
- extractFromTableConfigAndSchema(tableConfig, schema);
+ updateTableConfigAndSchema(tableConfig, schema);
}
@VisibleForTesting
@@ -117,7 +117,7 @@ public class IndexLoadingConfig {
public IndexLoadingConfig() {
}
- private void extractFromTableConfigAndSchema(TableConfig tableConfig,
@Nullable Schema schema) {
+ public void updateTableConfigAndSchema(TableConfig tableConfig, @Nullable
Schema schema) {
if (schema != null) {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
}
@@ -512,8 +512,8 @@ public class IndexLoadingConfig {
*/
@VisibleForTesting
public void setForwardIndexDisabledColumns(Set<String>
forwardIndexDisabledColumns) {
- _forwardIndexDisabledColumns = forwardIndexDisabledColumns == null ?
Collections.emptySet()
- : forwardIndexDisabledColumns;
+ _forwardIndexDisabledColumns =
+ forwardIndexDisabledColumns == null ? Collections.emptySet() :
forwardIndexDisabledColumns;
}
public Set<String> getNoDictionaryColumns() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]