leventov commented on a change in pull request #7653: Refactor 
SQLMetadataSegmentManager; Change contract of REST methods in 
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r287358254
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
 ##########
 @@ -197,366 +320,489 @@ private Runnable createPollTaskForStartOrder(long 
startOrder)
   }
 
   @Override
-  @LifecycleStop
-  public void stop()
+  public void stopPollingDatabasePeriodically()
   {
-    ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
     lock.lock();
     try {
-      if (!isStarted()) {
+      if (!isPollingDatabasePeriodically()) {
         return;
       }
 
-      dataSources = null;
-      currentStartOrder = -1;
-      exec.shutdownNow();
-      exec = null;
+      periodicPollTaskFuture.cancel(false);
+      latestDatabasePoll = null;
+
+      // NOT nulling dataSources, allowing to query the latest polled data 
even when this SegmentsMetadata object is
+      // stopped.
+
+      currentStartPollingOrder = -1;
     }
     finally {
       lock.unlock();
     }
   }
 
-  private Pair<DataSegment, Boolean> usedPayloadMapper(
-      final int index,
-      final ResultSet resultSet,
-      final StatementContext context
-  ) throws SQLException
+  private void awaitOrPerformDatabasePoll()
   {
+    // Double-checked locking with awaitPeriodicOrFreshOnDemandDatabasePoll() 
call playing the role of the "check".
+    if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+      return;
+    }
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+    lock.lock();
     try {
-      return new Pair<>(
-          jsonMapper.readValue(resultSet.getBytes("payload"), 
DataSegment.class),
-          resultSet.getBoolean("used")
-      );
+      if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+        return;
+      }
+      OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+      this.latestDatabasePoll = newOnDemandUpdate;
+      doOnDemandPoll(newOnDemandUpdate);
     }
-    catch (IOException e) {
-      throw new RuntimeException(e);
+    finally {
+      lock.unlock();
     }
   }
 
-  /**
-   * Gets a list of all datasegments that overlap the provided interval along 
with thier used status.
-   */
-  private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
-      final String dataSource,
-      final Interval interval
-  )
-  {
-    return connector.inReadOnlyTransaction(
-        (handle, status) -> handle.createQuery(
-            StringUtils.format(
-                "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource 
AND start < :end AND %2$send%2$s > :start",
-                getSegmentsTable(),
-                connector.getQuoteString()
-            )
-        )
-        .setFetchSize(connector.getStreamingFetchSize())
-        .bind("dataSource", dataSource)
-        .bind("start", interval.getStart().toString())
-        .bind("end", interval.getEnd().toString())
-        .map(this::usedPayloadMapper)
-        .list()
-    );
-  }
-
-  private List<Pair<DataSegment, Boolean>> getDataSegments(
-      final String dataSource,
-      final Collection<String> segmentIds,
-      final Handle handle
-  )
+  private boolean awaitPeriodicOrFreshOnDemandDatabasePoll()
 
 Review comment:
   Renamed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to