gianm commented on a change in pull request #7653: Refactor 
SQLMetadataSegmentManager; Change contract of REST methods in 
DataSourcesResource
URL: https://github.com/apache/incubator-druid/pull/7653#discussion_r286368954
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
 ##########
 @@ -197,366 +320,489 @@ private Runnable createPollTaskForStartOrder(long 
startOrder)
   }
 
   @Override
-  @LifecycleStop
-  public void stop()
+  public void stopPollingDatabasePeriodically()
   {
-    ReentrantReadWriteLock.WriteLock lock = startStopLock.writeLock();
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
     lock.lock();
     try {
-      if (!isStarted()) {
+      if (!isPollingDatabasePeriodically()) {
         return;
       }
 
-      dataSources = null;
-      currentStartOrder = -1;
-      exec.shutdownNow();
-      exec = null;
+      periodicPollTaskFuture.cancel(false);
+      latestDatabasePoll = null;
+
+      // NOT nulling dataSources, allowing to query the latest polled data 
even when this SegmentsMetadata object is
+      // stopped.
+
+      currentStartPollingOrder = -1;
     }
     finally {
       lock.unlock();
     }
   }
 
-  private Pair<DataSegment, Boolean> usedPayloadMapper(
-      final int index,
-      final ResultSet resultSet,
-      final StatementContext context
-  ) throws SQLException
+  private void awaitOrPerformDatabasePoll()
   {
+    // Double-checked locking with awaitPeriodicOrFreshOnDemandDatabasePoll() 
call playing the role of the "check".
+    if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+      return;
+    }
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+    lock.lock();
     try {
-      return new Pair<>(
-          jsonMapper.readValue(resultSet.getBytes("payload"), 
DataSegment.class),
-          resultSet.getBoolean("used")
-      );
+      if (awaitPeriodicOrFreshOnDemandDatabasePoll()) {
+        return;
+      }
+      OnDemandDatabasePoll newOnDemandUpdate = new OnDemandDatabasePoll();
+      this.latestDatabasePoll = newOnDemandUpdate;
+      doOnDemandPoll(newOnDemandUpdate);
     }
-    catch (IOException e) {
-      throw new RuntimeException(e);
+    finally {
+      lock.unlock();
     }
   }
 
-  /**
-   * Gets a list of all datasegments that overlap the provided interval along 
with thier used status.
-   */
-  private List<Pair<DataSegment, Boolean>> getDataSegmentsOverlappingInterval(
-      final String dataSource,
-      final Interval interval
-  )
-  {
-    return connector.inReadOnlyTransaction(
-        (handle, status) -> handle.createQuery(
-            StringUtils.format(
-                "SELECT used, payload FROM %1$s WHERE dataSource = :dataSource 
AND start < :end AND %2$send%2$s > :start",
-                getSegmentsTable(),
-                connector.getQuoteString()
-            )
-        )
-        .setFetchSize(connector.getStreamingFetchSize())
-        .bind("dataSource", dataSource)
-        .bind("start", interval.getStart().toString())
-        .bind("end", interval.getEnd().toString())
-        .map(this::usedPayloadMapper)
-        .list()
-    );
-  }
-
-  private List<Pair<DataSegment, Boolean>> getDataSegments(
-      final String dataSource,
-      final Collection<String> segmentIds,
-      final Handle handle
-  )
+  private boolean awaitPeriodicOrFreshOnDemandDatabasePoll()
   {
-    return segmentIds.stream().map(
-        segmentId -> Optional.ofNullable(
-            handle.createQuery(
-                StringUtils.format(
-                    "SELECT used, payload FROM %1$s WHERE dataSource = 
:dataSource AND id = :id",
-                    getSegmentsTable()
-                )
-            )
-            .bind("dataSource", dataSource)
-            .bind("id", segmentId)
-            .map(this::usedPayloadMapper)
-            .first()
-        )
-        .orElseThrow(() -> new 
UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", 
segmentId)))
-    )
-    .collect(Collectors.toList());
+    DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
 
 Review comment:
   It looks like the idea is that if we are in the midst of a leadership epoch, 
then:
   
   - `isPollingDatabasePeriodically` would return true
   - `latestDatabasePoll` will be a PeriodicDatabasePoll
   - we are guaranteed to await for that PeriodicDatabasePoll to complete its 
first poll, if `awaitPeriodicOrFreshOnDemandDatabasePoll` is called after the 
leadership epoch starts
   
   Is that right? If so it should provide the property I mentioned in another 
comment (preventing leadership epoch N from using an older snapshot than 
leadership epoch N-1). If so IMO it'd be nice to have some comments or 
assertions about these class invariants.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to