findingrish commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1530020329


##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLE
     );
   }
 
+  private void doPollSegmentAndSchema()
+  {
+    log.debug("Starting polling of segment and schema table");
+
+    ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats = 
new ConcurrentHashMap<>();
+
+    // some databases such as PostgreSQL require auto-commit turned off
+    // to stream results back, enabling transactions disables auto-commit
+    //
+    // setting connection to read-only will allow some database such as MySQL
+    // to automatically use read-only transaction mode, further optimizing the 
query
+    final List<DataSegment> segments = connector.inReadOnlyTransaction(
+        new TransactionCallback<List<DataSegment>>()
+        {
+          @Override
+          public List<DataSegment> inTransaction(Handle handle, 
TransactionStatus status)
+          {
+            return handle
+                .createQuery(StringUtils.format("SELECT payload, schema_id, 
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+                .setFetchSize(connector.getStreamingFetchSize())
+                .map(
+                    new ResultSetMapper<DataSegment>()
+                    {
+                      @Override
+                      public DataSegment map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
+                      {
+                        try {
+                          DataSegment segment = 
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+                          segmentStats.put(
+                              segment.getId(),
+                              new SegmentSchemaCache.SegmentStats(
+                                  (Long) r.getObject("schema_id"),
+                                  (Long) r.getObject("num_rows")
+                              )
+                          );
+                          return replaceWithExistingSegmentIfPresent(segment);
+                        }
+                        catch (IOException e) {
+                          log.makeAlert(e, "Failed to read segment from 
db.").emit();
+                          // If one entry in database is corrupted doPoll() 
should continue to work overall. See
+                          // filter by `Objects::nonNull` below in this method.
+                          return null;
+                        }
+                      }
+                    }
+                )
+                .list();
+          }
+        }
+    );
+
+    Map<Long, SchemaPayload> schemaMap = new HashMap<>();
+
+    String schemaPollQuery;
+    if (latestSegmentSchemaPoll == null) {
+      schemaPollQuery = StringUtils.format("SELECT id, payload, created_date 
FROM %s", getSegmentSchemaTable());
+    } else {
+      schemaPollQuery = StringUtils.format(
+          "SELECT id, payload, created_date FROM %1$s where created_date > 
'%2$s'",

Review Comment:
   I think I can update the query to filter on the auto-increment `id` column 
to fix this problem. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to