findingrish commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1529975659
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLE
);
}
+ private void doPollSegmentAndSchema()
+ {
+ log.debug("Starting polling of segment and schema table");
+
+ ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats =
new ConcurrentHashMap<>();
+
+ // some databases such as PostgreSQL require auto-commit turned off
+ // to stream results back, enabling transactions disables auto-commit
+ //
+ // setting connection to read-only will allow some database such as MySQL
+ // to automatically use read-only transaction mode, further optimizing the
query
+ final List<DataSegment> segments = connector.inReadOnlyTransaction(
+ new TransactionCallback<List<DataSegment>>()
+ {
+ @Override
+ public List<DataSegment> inTransaction(Handle handle,
TransactionStatus status)
+ {
+ return handle
+ .createQuery(StringUtils.format("SELECT payload, schema_id,
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map(
+ new ResultSetMapper<DataSegment>()
+ {
+ @Override
+ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLException
+ {
+ try {
+ DataSegment segment =
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+ segmentStats.put(
+ segment.getId(),
+ new SegmentSchemaCache.SegmentStats(
+ (Long) r.getObject("schema_id"),
+ (Long) r.getObject("num_rows")
+ )
+ );
+ return replaceWithExistingSegmentIfPresent(segment);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read segment from
db.").emit();
+ // If one entry in database is corrupted doPoll()
should continue to work overall. See
+ // filter by `Objects::nonNull` below in this method.
+ return null;
+ }
+ }
+ }
+ )
+ .list();
+ }
+ }
+ );
+
+ Map<Long, SchemaPayload> schemaMap = new HashMap<>();
+
+ String schemaPollQuery;
+ if (latestSegmentSchemaPoll == null) {
+ schemaPollQuery = StringUtils.format("SELECT id, payload, created_date
FROM %s", getSegmentSchemaTable());
+ } else {
+ schemaPollQuery = StringUtils.format(
+ "SELECT id, payload, created_date FROM %1$s where created_date >
'%2$s'",
Review Comment:
Good point, it is possible that the poll could miss schema created at the
same time.
Actually, it is possible that the poll could miss schema created earlier as
well.
Let's say a transaction with timestamp t < created_date failed, and is still
retrying while the poll happened, it is possible that we will miss those schema
in the next poll even if the poll condition is `greater than equal to`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]