This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch revert
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 353c7b106179b00e004b3e976bc6068d9b01b950
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Sep 23 13:36:40 2019 -0700

    Revert "Set processingException when all queried segments cannot be 
acquired (#3942)"
    
    This reverts commit d4f2ecef660ab1d4efa9696a53b0623aac867c3f.
---
 .../pinot/common/exception/QueryException.java     |  2 -
 .../core/data/manager/BaseTableDataManager.java    | 48 ++++-----------------
 .../core/data/manager/InstanceDataManager.java     | 14 ------
 .../pinot/core/data/manager/TableDataManager.java  | 15 -------
 .../query/executor/ServerQueryExecutorV1Impl.java  | 32 +-------------
 .../data/manager/BaseTableDataManagerTest.java     |  5 ---
 .../pinot/query/executor/QueryExecutorTest.java    | 50 ++--------------------
 .../starter/helix/HelixInstanceDataManager.java    | 16 -------
 .../SegmentOnlineOfflineStateModelFactory.java     |  5 ---
 9 files changed, 13 insertions(+), 174 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 6f83c8d..b0b9be5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -41,7 +41,6 @@ public class QueryException {
   public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160;
   public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170;
   public static final int ACCESS_DENIED_ERROR_CODE = 180;
-  public static final int SEGMENTS_MISSING_ERROR_CODE = 190;
   public static final int QUERY_EXECUTION_ERROR_CODE = 200;
   // TODO: Handle these errors in broker
   public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
@@ -98,7 +97,6 @@ public class QueryException {
   public static final ProcessingException QUERY_VALIDATION_ERROR = new 
ProcessingException(QUERY_VALIDATION_ERROR_CODE);
   public static final ProcessingException UNKNOWN_ERROR = new 
ProcessingException(UNKNOWN_ERROR_CODE);
   public static final ProcessingException QUOTA_EXCEEDED_ERROR = new 
ProcessingException(TOO_MANY_REQUESTS_ERROR_CODE);
-  public static final ProcessingException SEGMENTS_MISSING_ERROR = new 
ProcessingException(SEGMENTS_MISSING_ERROR_CODE);
 
   static {
     JSON_PARSING_ERROR.setMessage("JsonParsingError");
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 3f074a1..a456691 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -19,13 +19,10 @@
 package org.apache.pinot.core.data.manager;
 
 import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.ZNRecord;
@@ -45,13 +42,9 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public abstract class BaseTableDataManager implements TableDataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTableDataManager.class);
-  // cache deleted segment names for utmost this duration
-  private static final int MAX_CACHE_DURATION_SEC = 6 * 3600; // 6 hours
 
   protected final ConcurrentHashMap<String, SegmentDataManager> 
_segmentDataManagerMap = new ConcurrentHashMap<>();
 
-  protected Cache<String, Boolean> _deletedSegmentsCache;
-
   protected TableDataManagerConfig _tableDataManagerConfig;
   protected String _instanceId;
   protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -66,7 +59,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       @Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore, @Nonnull 
ServerMetrics serverMetrics) {
     LOGGER.info("Initializing table data manager for table: {}", 
tableDataManagerConfig.getTableName());
 
-    _deletedSegmentsCache = 
CacheBuilder.newBuilder().expireAfterWrite(MAX_CACHE_DURATION_SEC, 
TimeUnit.SECONDS).build();
     _tableDataManagerConfig = tableDataManagerConfig;
     _instanceId = instanceId;
     _propertyStore = propertyStore;
@@ -125,8 +117,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
     ImmutableSegmentDataManager newSegmentManager = new 
ImmutableSegmentDataManager(immutableSegment);
     SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.put(segmentName, newSegmentManager);
-
-    // release old segment if needed
     if (oldSegmentManager == null) {
       _logger.info("Added new immutable segment: {} to table: {}", 
segmentName, _tableNameWithType);
     } else {
@@ -166,35 +156,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     }
   }
 
-  /**
-   * Called when a segment is deleted. The actual handling of segment delete 
is outside of this method.
-   * This method provides book-keeping around deleted segments.
-   * @param segmentName name of the segment to track.
-   */
-  public void notifySegmentDeleted(@Nonnull String segmentName) {
-    // add segment to the cache
-    _deletedSegmentsCache.put(segmentName, true);
-  }
-
-  /**
-   * Check if a segment is recently deleted.
-   *
-   * @param segmentName name of the segment to check.
-   * @return true if segment is in the cache, false otherwise
-   */
-  public boolean isRecentlyDeleted(@Nonnull String segmentName) {
-    return _deletedSegmentsCache.getIfPresent(segmentName) != null;
-  }
-
-  /**
-   * Remove a segment from the deleted cache if it is being added back.
-   *
-   * @param segmentName name of the segment that needs to removed from the 
cache (if needed)
-   */
-  public void notifySegmentAdded(@Nonnull String segmentName) {
-    _deletedSegmentsCache.invalidate(segmentName);
-  }
-
   @Nonnull
   @Override
   public List<SegmentDataManager> acquireAllSegments() {
@@ -215,6 +176,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
       if (segmentDataManager != null && 
segmentDataManager.increaseReferenceCount()) {
         segmentDataManagers.add(segmentDataManager);
+      } else {
+        handleMissingSegment(segmentName);
       }
     }
     return segmentDataManagers;
@@ -226,10 +189,17 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     if (segmentDataManager != null && 
segmentDataManager.increaseReferenceCount()) {
       return segmentDataManager;
     } else {
+      handleMissingSegment(segmentName);
       return null;
     }
   }
 
+  private void handleMissingSegment(String segmentName) {
+    // could not find segment
+    LOGGER.error("Could not find segment " + segmentName + " for table " + 
_tableNameWithType);
+    _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.NUM_MISSING_SEGMENTS, 1);
+  }
+
   @Override
   public void releaseSegment(@Nonnull SegmentDataManager segmentDataManager) {
     if (segmentDataManager.decreaseReferenceCount()) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 9399c7a..85090dc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -79,20 +79,6 @@ public interface InstanceDataManager {
       throws Exception;
 
   /**
-   * Handles addition of a segment from the table.
-   *
-   * This method performs book keeping of added segments, especially if the 
deleted-cache needs to be invalidated
-   */
-  void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String 
segmentName);
-
-  /**
-   * Handles deletion of a segment from the table.
-   *
-   * This method performs book keeping of deleted segments.
-   */
-  void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String 
segmentName);
-
-  /**
    * Reloads a segment in a table.
    */
   void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String 
segmentName)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
index 16eae6b..c237fbf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
@@ -80,21 +80,6 @@ public interface TableDataManager {
   void removeSegment(@Nonnull String segmentName);
 
   /**
-   * Track a deleted segment.
-   */
-  void notifySegmentDeleted(@Nonnull String segmentName);
-
-  /**
-   * Track addition of a segment
-   */
-  void notifySegmentAdded(@Nonnull String segmentName);
-
-  /**
-   * Check if a segment is recently deleted.
-   */
-  boolean isRecentlyDeleted(@Nonnull String segmentName);
-
-  /**
    * Acquires all segments of the table.
    * <p>It is the caller's responsibility to return the segments by calling 
{@link #releaseSegment(SegmentDataManager)}.
    *
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index b60471e..72072f2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.query.executor;
 
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -127,24 +126,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
 
     TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
     Preconditions.checkState(tableDataManager != null, "Failed to find data 
manager for table: " + tableNameWithType);
-
-    // acquire the segments
-    int missingSegments = 0;
-    List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
-    List<SegmentDataManager> segmentDataManagers = new ArrayList<>();
-    for (String segmentName : segmentsToQuery) {
-      SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
-      if (segmentDataManager != null) {
-        segmentDataManagers.add(segmentDataManager);
-      } else {
-        if (!tableDataManager.isRecentlyDeleted(segmentName)) {
-          LOGGER.error("Could not find segment {} for table {} for requestId 
{}", segmentName, tableNameWithType,
-              requestId);
-          missingSegments++;
-        }
-      }
-    }
-
+    List<SegmentDataManager> segmentDataManagers = 
tableDataManager.acquireSegments(queryRequest.getSegmentsToQuery());
     int numSegmentsQueried = segmentDataManagers.size();
     boolean enableTrace = queryRequest.isEnableTrace();
     if (enableTrace) {
@@ -244,18 +226,6 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, 
Integer.toString(numSegmentsQueried));
     dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, 
Long.toString(queryProcessingTime));
 
-    if (missingSegments > 0) {
-      // TODO: add this exception to the datatable after verfying the metrics
-      // Currently, given the deleted segments cache is in-memory only, a 
server restart will reset it
-      // We might end up sending partial-response metadata in such cases. It 
appears that the likelihood of
-      // this occurence is low; ie, segment has to be retained out and the 
server must be restarted while the
-      // broker view is still behind. We would however like to validate that 
and/or conf control this based on
-      // data.
-      
/*dataTable.addException(QueryException.getException(QueryException.SEGMENTS_MISSING_ERROR,
-          "Could not find " + missingSegments + " segments on the server"));*/
-      _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_MISSING_SEGMENTS, missingSegments);
-    }
-
     if (numConsumingSegmentsProcessed > 0) {
       dataTable.getMetadata().put(DataTable.NUM_CONSUMING_SEGMENTS_PROCESSED, 
Integer.toString(numConsumingSegmentsProcessed));
       dataTable.getMetadata().put(DataTable.MIN_CONSUMING_FRESHNESS_TIME_MS, 
Long.toString(minConsumingFreshnessTimeMs));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index a19b288..9b28f5a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -158,11 +158,6 @@ public class BaseTableDataManagerTest {
     // Removing the segment again is fine.
     tableDataManager.removeSegment(segmentName);
 
-    // Delete the segment
-    tableDataManager.notifySegmentDeleted(segmentName);
-    // check that it is recorded as deleted
-    Assert.assertTrue(tableDataManager.isRecentlyDeleted(segmentName));
-
     // Add a new segment and remove it in order this time.
     final String anotherSeg = "AnotherSegment";
     ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index a4b3241..b6ce1e2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -68,7 +68,6 @@ public class QueryExecutorTest {
   private final List<ImmutableSegment> _indexSegments = new 
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
   private final List<String> _segmentNames = new 
ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
 
-  private InstanceDataManager _instanceDataManager;
   private ServerMetrics _serverMetrics;
   private QueryExecutor _queryExecutor;
 
@@ -106,8 +105,8 @@ public class QueryExecutorTest {
     for (ImmutableSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment(indexSegment);
     }
-    _instanceDataManager = mock(InstanceDataManager.class);
-    
when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
+    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
+    
when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
 
     // Set up the query executor
     resourceUrl = 
getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
@@ -116,7 +115,7 @@ public class QueryExecutorTest {
     queryExecutorConfig.setDelimiterParsingDisabled(false);
     queryExecutorConfig.load(new File(resourceUrl.getFile()));
     _queryExecutor = new ServerQueryExecutorV1Impl();
-    _queryExecutor.init(queryExecutorConfig, _instanceDataManager, 
_serverMetrics);
+    _queryExecutor.init(queryExecutorConfig, instanceDataManager, 
_serverMetrics);
   }
 
   @Test
@@ -155,49 +154,6 @@ public class QueryExecutorTest {
     Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0);
   }
 
-  @Test
-  public void testDeletedSegmentQuery() {
-    String query = "SELECT count(*) FROM " + TABLE_NAME;
-    _instanceDataManager.notifySegmentDeleted(TABLE_NAME, 
_segmentNames.get(0));
-
-    InstanceRequest instanceRequest = new InstanceRequest(0L, 
COMPILER.compileToBrokerRequest(query));
-    instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
-
-    for (String key : instanceResponse.getMetadata().keySet()) {
-      if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
-        Assert.fail("Response should not contain exceptions");
-      }
-    }
-  }
-
-  // TODO: enable this when the code is updated to set the exception
-  @Test(enabled=false)
-  public void testMissingSegmentQuery() {
-    String query = "SELECT count(*) FROM " + TABLE_NAME;
-
-    List<String> searchSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE + 
1);
-    searchSegments.addAll(_segmentNames);
-    searchSegments.add("NON_EXISTENT_SEGMENT");
-
-    InstanceRequest instanceRequest = new InstanceRequest(0L, 
COMPILER.compileToBrokerRequest(query));
-    instanceRequest.setSearchSegments(searchSegments);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
-
-    boolean exception = false;
-    for (String key : instanceResponse.getMetadata().keySet()) {
-      if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
-        // "null" below stems from a quirk around how the processing exception 
is built
-        Assert.assertEquals("null:\nCould not find 1 segments on the server", 
instanceResponse.getMetadata().get(key));
-        exception = true;
-      }
-    }
-    Assert.assertTrue(exception, "Expected missing segment exception");
-  }
-
-
   @AfterClass
   public void tearDown() {
     for (IndexSegment segment : _indexSegments) {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index aafeebe..2c14940 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -152,22 +152,6 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   }
 
   @Override
-  public void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull 
String segmentName) {
-    TableDataManager tableDataManager = 
_tableDataManagerMap.get(tableNameWithType);
-    if (tableDataManager != null) {
-      tableDataManager.notifySegmentAdded(segmentName);
-    }
-  }
-
-  @Override
-  public void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull 
String segmentName) {
-    TableDataManager tableDataManager = 
_tableDataManagerMap.get(tableNameWithType);
-    if (tableDataManager != null) {
-      tableDataManager.notifySegmentDeleted(segmentName);
-    }
-  }
-
-  @Override
   public void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String 
segmentName)
       throws Exception {
     LOGGER.info("Reloading single segment: {} in table: {}", segmentName, 
tableNameWithType);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index d915373..eea3b57 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -166,8 +166,6 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
         } else {
           _instanceDataManager.addRealtimeSegment(tableNameWithType, 
segmentName);
         }
-        // handle any book-keeping after a segment is added
-        _instanceDataManager.notifySegmentAdded(tableNameWithType, 
segmentName);
       } catch (Exception e) {
         _logger.error("Caught exception in state transition from OFFLINE -> 
ONLINE for resource: {}, partition: {}",
             tableNameWithType, segmentName, e);
@@ -198,9 +196,6 @@ public class SegmentOnlineOfflineStateModelFactory extends 
StateModelFactory<Sta
       String tableNameWithType = message.getResourceName();
       String segmentName = message.getPartitionName();
 
-      // handle any additional book-keeping that needs to be done when a 
segment is dropped
-      _instanceDataManager.notifySegmentDeleted(tableNameWithType, 
segmentName);
-
       // This method might modify the file on disk. Use segment lock to 
prevent race condition
       Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, 
segmentName);
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to