github-advanced-security[bot] commented on code in PR #15817:
URL: https://github.com/apache/druid/pull/15817#discussion_r1530758427
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -1080,6 +1112,147 @@
);
}
+ private void doPollSegmentAndSchema()
+ {
+ log.info("Starting polling of segment and schema table");
+
+ ConcurrentMap<SegmentId, SegmentSchemaCache.SegmentStats> segmentStats =
new ConcurrentHashMap<>();
+
+ // some databases such as PostgreSQL require auto-commit turned off
+ // to stream results back, enabling transactions disables auto-commit
+ //
+ // setting connection to read-only will allow some database such as MySQL
+ // to automatically use read-only transaction mode, further optimizing the
query
+ final List<DataSegment> segments = connector.inReadOnlyTransaction(
+ new TransactionCallback<List<DataSegment>>()
+ {
+ @Override
+ public List<DataSegment> inTransaction(Handle handle,
TransactionStatus status)
+ {
+ return handle
+ .createQuery(StringUtils.format("SELECT payload, schema_id,
num_rows FROM %s WHERE used=true", getSegmentsTable()))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map(
+ new ResultSetMapper<DataSegment>()
+ {
+ @Override
+ public DataSegment map(int index, ResultSet r,
StatementContext ctx) throws SQLException
+ {
+ try {
+ DataSegment segment =
jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+ segmentStats.put(
+ segment.getId(),
+ new SegmentSchemaCache.SegmentStats(
+ (Long) r.getObject("schema_id"),
+ (Long) r.getObject("num_rows")
+ )
+ );
+ return replaceWithExistingSegmentIfPresent(segment);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read segment from
db.").emit();
+ // If one entry in database is corrupted doPoll()
should continue to work overall. See
+ // filter by `Objects::nonNull` below in this method.
+ return null;
+ }
+ }
+ }
+ )
+ .list();
+ }
+ }
+ );
+
+ Map<Long, SchemaPayload> schemaMap = new HashMap<>();
+
+ String schemaPollQuery;
+ if (latestSchemaId == null) {
+ schemaPollQuery = StringUtils.format("SELECT id, payload FROM %s",
getSegmentSchemaTable());
+ } else {
+ schemaPollQuery = StringUtils.format(
+ "SELECT id, payload FROM %1$s where id > '%2$s'",
+ getSegmentSchemaTable(),
+ latestSchemaId);
+ }
+ String finalSchemaPollQuery = schemaPollQuery;
+
+ final AtomicReference<Long> maxPolledId = new AtomicReference<>();
+ maxPolledId.set(latestSchemaId);
+
+ connector.inReadOnlyTransaction(new TransactionCallback<Object>()
+ {
+ @Override
+ public Object inTransaction(Handle handle, TransactionStatus status)
+ {
+ return handle.createQuery(finalSchemaPollQuery)
+ .map(new ResultSetMapper<Void>()
+ {
+ @Override
+ public Void map(int index, ResultSet r,
StatementContext ctx) throws SQLException
+ {
+ try {
+ Long id = r.getLong("id");
Review Comment:
## Boxed variable is never null
The variable 'id' is only assigned values of primitive type and is never
'null', but it is declared with the boxed type 'Long'.
[Show more
details](https://github.com/apache/druid/security/code-scanning/7155)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]