This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch revert in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 353c7b106179b00e004b3e976bc6068d9b01b950 Author: Jennifer Dai <[email protected]> AuthorDate: Mon Sep 23 13:36:40 2019 -0700 Revert "Set processingException when all queried segments cannot be acquired (#3942)" This reverts commit d4f2ecef660ab1d4efa9696a53b0623aac867c3f. --- .../pinot/common/exception/QueryException.java | 2 - .../core/data/manager/BaseTableDataManager.java | 48 ++++----------------- .../core/data/manager/InstanceDataManager.java | 14 ------ .../pinot/core/data/manager/TableDataManager.java | 15 ------- .../query/executor/ServerQueryExecutorV1Impl.java | 32 +------------- .../data/manager/BaseTableDataManagerTest.java | 5 --- .../pinot/query/executor/QueryExecutorTest.java | 50 ++-------------------- .../starter/helix/HelixInstanceDataManager.java | 16 ------- .../SegmentOnlineOfflineStateModelFactory.java | 5 --- 9 files changed, 13 insertions(+), 174 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index 6f83c8d..b0b9be5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -41,7 +41,6 @@ public class QueryException { public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160; public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170; public static final int ACCESS_DENIED_ERROR_CODE = 180; - public static final int SEGMENTS_MISSING_ERROR_CODE = 190; public static final int QUERY_EXECUTION_ERROR_CODE = 200; // TODO: Handle these errors in broker public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210; @@ -98,7 +97,6 @@ public class QueryException { public static final ProcessingException QUERY_VALIDATION_ERROR = new ProcessingException(QUERY_VALIDATION_ERROR_CODE); public static final ProcessingException UNKNOWN_ERROR = new ProcessingException(UNKNOWN_ERROR_CODE); public static final ProcessingException QUOTA_EXCEEDED_ERROR = new ProcessingException(TOO_MANY_REQUESTS_ERROR_CODE); - public static final ProcessingException SEGMENTS_MISSING_ERROR = new ProcessingException(SEGMENTS_MISSING_ERROR_CODE); static { JSON_PARSING_ERROR.setMessage("JsonParsingError"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 3f074a1..a456691 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -19,13 +19,10 @@ package org.apache.pinot.core.data.manager; import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.apache.helix.ZNRecord; @@ -45,13 +42,9 @@ import org.slf4j.LoggerFactory; @ThreadSafe public abstract class BaseTableDataManager implements TableDataManager { private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class); - // cache deleted segment names for utmost this duration - private static final int MAX_CACHE_DURATION_SEC = 6 * 3600; // 6 hours protected final ConcurrentHashMap<String, SegmentDataManager> _segmentDataManagerMap = new ConcurrentHashMap<>(); - protected Cache<String, Boolean> _deletedSegmentsCache; - protected TableDataManagerConfig _tableDataManagerConfig; protected String _instanceId; protected ZkHelixPropertyStore<ZNRecord> _propertyStore; @@ -66,7 +59,6 @@ public abstract class BaseTableDataManager implements TableDataManager { @Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore, @Nonnull ServerMetrics serverMetrics) { LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName()); - _deletedSegmentsCache = CacheBuilder.newBuilder().expireAfterWrite(MAX_CACHE_DURATION_SEC, TimeUnit.SECONDS).build(); _tableDataManagerConfig = tableDataManagerConfig; _instanceId = instanceId; _propertyStore = propertyStore; @@ -125,8 +117,6 @@ public abstract class BaseTableDataManager implements TableDataManager { ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment); SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager); - - // release old segment if needed if (oldSegmentManager == null) { _logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType); } else { @@ -166,35 +156,6 @@ public abstract class BaseTableDataManager implements TableDataManager { } } - /** - * Called when a segment is deleted. The actual handling of segment delete is outside of this method. - * This method provides book-keeping around deleted segments. - * @param segmentName name of the segment to track. - */ - public void notifySegmentDeleted(@Nonnull String segmentName) { - // add segment to the cache - _deletedSegmentsCache.put(segmentName, true); - } - - /** - * Check if a segment is recently deleted. - * - * @param segmentName name of the segment to check. - * @return true if segment is in the cache, false otherwise - */ - public boolean isRecentlyDeleted(@Nonnull String segmentName) { - return _deletedSegmentsCache.getIfPresent(segmentName) != null; - } - - /** - * Remove a segment from the deleted cache if it is being added back. - * - * @param segmentName name of the segment that needs to removed from the cache (if needed) - */ - public void notifySegmentAdded(@Nonnull String segmentName) { - _deletedSegmentsCache.invalidate(segmentName); - } - @Nonnull @Override public List<SegmentDataManager> acquireAllSegments() { @@ -215,6 +176,8 @@ public abstract class BaseTableDataManager implements TableDataManager { SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) { segmentDataManagers.add(segmentDataManager); + } else { + handleMissingSegment(segmentName); } } return segmentDataManagers; @@ -226,10 +189,17 @@ public abstract class BaseTableDataManager implements TableDataManager { if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) { return segmentDataManager; } else { + handleMissingSegment(segmentName); return null; } } + private void handleMissingSegment(String segmentName) { + // could not find segment + LOGGER.error("Could not find segment " + segmentName + " for table " + _tableNameWithType); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, 1); + } + @Override public void releaseSegment(@Nonnull SegmentDataManager segmentDataManager) { if (segmentDataManager.decreaseReferenceCount()) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 9399c7a..85090dc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -79,20 +79,6 @@ public interface InstanceDataManager { throws Exception; /** - * Handles addition of a segment from the table. - * - * This method performs book keeping of added segments, especially if the deleted-cache needs to be invalidated - */ - void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName); - - /** - * Handles deletion of a segment from the table. - * - * This method performs book keeping of deleted segments. - */ - void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName); - - /** * Reloads a segment in a table. */ void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java index 16eae6b..c237fbf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java @@ -80,21 +80,6 @@ public interface TableDataManager { void removeSegment(@Nonnull String segmentName); /** - * Track a deleted segment. - */ - void notifySegmentDeleted(@Nonnull String segmentName); - - /** - * Track addition of a segment - */ - void notifySegmentAdded(@Nonnull String segmentName); - - /** - * Check if a segment is recently deleted. - */ - boolean isRecentlyDeleted(@Nonnull String segmentName); - - /** * Acquires all segments of the table. * <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}. * diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index b60471e..72072f2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -19,7 +19,6 @@ package org.apache.pinot.core.query.executor; import com.google.common.base.Preconditions; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -127,24 +126,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: " + tableNameWithType); - - // acquire the segments - int missingSegments = 0; - List<String> segmentsToQuery = queryRequest.getSegmentsToQuery(); - List<SegmentDataManager> segmentDataManagers = new ArrayList<>(); - for (String segmentName : segmentsToQuery) { - SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); - if (segmentDataManager != null) { - segmentDataManagers.add(segmentDataManager); - } else { - if (!tableDataManager.isRecentlyDeleted(segmentName)) { - LOGGER.error("Could not find segment {} for table {} for requestId {}", segmentName, tableNameWithType, - requestId); - missingSegments++; - } - } - } - + List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(queryRequest.getSegmentsToQuery()); int numSegmentsQueried = segmentDataManagers.size(); boolean enableTrace = queryRequest.isEnableTrace(); if (enableTrace) { @@ -244,18 +226,6 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Integer.toString(numSegmentsQueried)); dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime)); - if (missingSegments > 0) { - // TODO: add this exception to the datatable after verfying the metrics - // Currently, given the deleted segments cache is in-memory only, a server restart will reset it - // We might end up sending partial-response metadata in such cases. It appears that the likelihood of - // this occurence is low; ie, segment has to be retained out and the server must be restarted while the - // broker view is still behind. We would however like to validate that and/or conf control this based on - // data. - /*dataTable.addException(QueryException.getException(QueryException.SEGMENTS_MISSING_ERROR, - "Could not find " + missingSegments + " segments on the server"));*/ - _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, missingSegments); - } - if (numConsumingSegmentsProcessed > 0) { dataTable.getMetadata().put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, Integer.toString(numConsumingSegmentsProcessed)); dataTable.getMetadata().put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, Long.toString(minConsumingFreshnessTimeMs)); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index a19b288..9b28f5a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -158,11 +158,6 @@ public class BaseTableDataManagerTest { // Removing the segment again is fine. tableDataManager.removeSegment(segmentName); - // Delete the segment - tableDataManager.notifySegmentDeleted(segmentName); - // check that it is recorded as deleted - Assert.assertTrue(tableDataManager.isRecentlyDeleted(segmentName)); - // Add a new segment and remove it in order this time. final String anotherSeg = "AnotherSegment"; ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs); diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java index a4b3241..b6ce1e2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java @@ -68,7 +68,6 @@ public class QueryExecutorTest { private final List<ImmutableSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE); private final List<String> _segmentNames = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE); - private InstanceDataManager _instanceDataManager; private ServerMetrics _serverMetrics; private QueryExecutor _queryExecutor; @@ -106,8 +105,8 @@ public class QueryExecutorTest { for (ImmutableSegment indexSegment : _indexSegments) { tableDataManager.addSegment(indexSegment); } - _instanceDataManager = mock(InstanceDataManager.class); - when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager); + InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); + when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager); // Set up the query executor resourceUrl = getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH); @@ -116,7 +115,7 @@ public class QueryExecutorTest { queryExecutorConfig.setDelimiterParsingDisabled(false); queryExecutorConfig.load(new File(resourceUrl.getFile())); _queryExecutor = new ServerQueryExecutorV1Impl(); - _queryExecutor.init(queryExecutorConfig, _instanceDataManager, _serverMetrics); + _queryExecutor.init(queryExecutorConfig, instanceDataManager, _serverMetrics); } @Test @@ -155,49 +154,6 @@ public class QueryExecutorTest { Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0); } - @Test - public void testDeletedSegmentQuery() { - String query = "SELECT count(*) FROM " + TABLE_NAME; - _instanceDataManager.notifySegmentDeleted(TABLE_NAME, _segmentNames.get(0)); - - InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query)); - instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L); - - for (String key : instanceResponse.getMetadata().keySet()) { - if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) { - Assert.fail("Response should not contain exceptions"); - } - } - } - - // TODO: enable this when the code is updated to set the exception - @Test(enabled=false) - public void testMissingSegmentQuery() { - String query = "SELECT count(*) FROM " + TABLE_NAME; - - List<String> searchSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE + 1); - searchSegments.addAll(_segmentNames); - searchSegments.add("NON_EXISTENT_SEGMENT"); - - InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query)); - instanceRequest.setSearchSegments(searchSegments); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L); - - boolean exception = false; - for (String key : instanceResponse.getMetadata().keySet()) { - if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) { - // "null" below stems from a quirk around how the processing exception is built - Assert.assertEquals("null:\nCould not find 1 segments on the server", instanceResponse.getMetadata().get(key)); - exception = true; - } - } - Assert.assertTrue(exception, "Expected missing segment exception"); - } - - @AfterClass public void tearDown() { for (IndexSegment segment : _indexSegments) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index aafeebe..2c14940 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -152,22 +152,6 @@ public class HelixInstanceDataManager implements InstanceDataManager { } @Override - public void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName) { - TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); - if (tableDataManager != null) { - tableDataManager.notifySegmentAdded(segmentName); - } - } - - @Override - public void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName) { - TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); - if (tableDataManager != null) { - tableDataManager.notifySegmentDeleted(segmentName); - } - } - - @Override public void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName) throws Exception { LOGGER.info("Reloading single segment: {} in table: {}", segmentName, tableNameWithType); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index d915373..eea3b57 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -166,8 +166,6 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta } else { _instanceDataManager.addRealtimeSegment(tableNameWithType, segmentName); } - // handle any book-keeping after a segment is added - _instanceDataManager.notifySegmentAdded(tableNameWithType, segmentName); } catch (Exception e) { _logger.error("Caught exception in state transition from OFFLINE -> ONLINE for resource: {}, partition: {}", tableNameWithType, segmentName, e); @@ -198,9 +196,6 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta String tableNameWithType = message.getResourceName(); String segmentName = message.getPartitionName(); - // handle any additional book-keeping that needs to be done when a segment is dropped - _instanceDataManager.notifySegmentDeleted(tableNameWithType, segmentName); - // This method might modify the file on disk. Use segment lock to prevent race condition Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName); try { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
