AmatyaAvadhanula commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1522894697


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java:
##########
@@ -180,7 +191,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws 
Exception
     org.apache.commons.io.FileUtils.deleteQuietly(persistDir);
     FileUtils.mkdirp(persistDir);
 
-    final Set<DataSegment> pushedSegments = mergeAndPushSegments(
+    final Pair<Set<DataSegment>, MinimalSegmentSchemas> pushedSegments = 
mergeAndPushSegments(

Review Comment:
   Can `SegmentAndSchemas` be moved from multi-stage-query to a core module?
   It might be better to use SegmentAndSchemas instead of a Pair of its fields 
here



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    log.debug("Starting polling of segment and schema table");
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    Map<Long, SchemaPayload> schemaMap = new HashMap<>();
+
+    String schemaPollQuery;
+    if (latestSegmentSchemaPoll == null) {
+      schemaPollQuery = StringUtils.format("SELECT id, payload, created_date 
FROM %s", getSegmentSchemaTable());
+    } else {
+      schemaPollQuery = StringUtils.format(
+          "SELECT id, payload, created_date FROM %1$s where created_date > 
'%2$s'",
+          getSegmentSchemaTable(),
+          latestSegmentSchemaPoll.toString());
+    }
+    String finalSchemaPollQuery = schemaPollQuery;
+    final DateTime[] maxCreatedDate = {latestSegmentSchemaPoll};

Review Comment:
   Why is this an array?



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2829,4 +2972,71 @@ public String toString()
     }
   }
 
+  public static class DataSegmentWithSchemaInformation

Review Comment:
   This can be private



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    log.debug("Starting polling of segment and schema table");

Review Comment:
   I think this can be info (To be consistent with changes in 
https://github.com/apache/druid/pull/15952)



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    log.debug("Starting polling of segment and schema table");
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    Map<Long, SchemaPayload> schemaMap = new HashMap<>();
+
+    String schemaPollQuery;
+    if (latestSegmentSchemaPoll == null) {
+      schemaPollQuery = StringUtils.format("SELECT id, payload, created_date 
FROM %s", getSegmentSchemaTable());
+    } else {
+      schemaPollQuery = StringUtils.format(
+          "SELECT id, payload, created_date FROM %1$s where created_date > 
'%2$s'",

Review Comment:
   Should the created_date be greater than or equal to?



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -2245,29 +2376,41 @@ private void insertIntoUpgradeSegmentsTable(
     }
   }
 
-  private List<DataSegment> retrieveSegmentsById(Handle handle, String 
datasource, Set<String> segmentIds)
+  private List<DataSegmentWithSchemaInformation> retrieveSegmentsById(Handle 
handle, String datasource, Set<String> segmentIds)
   {
     if (segmentIds.isEmpty()) {
       return Collections.emptyList();
     }
 
     return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, 
jsonMapper)
-                                   .retrieveSegmentsById(datasource, 
segmentIds)
+                                   .retrieveSegmentsById(datasource, 
segmentIds, centralizedDatasourceSchemaConfig.isEnabled())
                                    .stream()
                                    .map(DataSegmentPlus::getDataSegment)
+        .map(v -> new DataSegmentWithSchemaInformation(v, null, null))

Review Comment:
   Why are the numRows and schemaId always null here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to