mosche commented on a change in pull request #17113:
URL: https://github.com/apache/beam/pull/17113#discussion_r830838124
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.java
##########
@@ -947,25 +1003,37 @@ private void validateExplicitHashKey(String hashKey) {
* with KCL to correctly implement the binary protocol, specifically {@link
* software.amazon.kinesis.retrieval.kpl.Messages.AggregatedRecord}.
*
- * <p>Note: The aggregation is a lot simpler than the one offered by KPL.
While the KPL is aware
- * of effective hash key ranges assigned to each shard, we're not and
don't want to be to keep
- * complexity manageable and avoid the risk of silently loosing records in
the KCL:
+ * <p>To aggregate records the best possible way, records are assigned an
explicit hash key that
+ * corresponds to the lower bound of the hash key range of the target
shard. In case a record
+ * has already an explicit hash key assigned, it is kept unchanged.
*
- * <p>{@link
software.amazon.kinesis.retrieval.AggregatorUtil#deaggregate(List, BigInteger,
- * BigInteger)} drops records not matching the expected hash key range.
+ * <p>Hash key ranges of shards are expected to be only slowly changing
and get refreshed
+ * infrequently. If using an {@link ExplicitPartitioner} or disabling
shard refresh via {@link
+ * RecordAggregation}, no shard details will be pulled.
*/
static class AggregatedWriter<T> extends Writer<T> {
private static final Logger LOG =
LoggerFactory.getLogger(AggregatedWriter.class);
+ private static final ObjectPool<String, ShardRanges>
SHARDRANGES_BY_STREAM =
+ new ObjectPool<>(ShardRanges::of);
private final RecordAggregation aggSpec;
private final Map<BigInteger, RecordsAggregator> aggregators;
- private final MessageDigest md5Digest;
+ private final PartitionKeyHasher pkHasher;
+
+ private final ShardRanges shardRanges;
AggregatedWriter(PipelineOptions options, Write<T> spec,
RecordAggregation aggSpec) {
super(options, spec);
this.aggSpec = aggSpec;
- this.aggregators = new LinkedHashMap<>();
- this.md5Digest = md5Digest();
+ aggregators = new LinkedHashMap<>();
+ pkHasher = new PartitionKeyHasher();
+ if (aggSpec.shardRefreshInterval().isLongerThan(Duration.ZERO)
+ && !(spec.partitioner() instanceof ExplicitPartitioner)) {
+ shardRanges = SHARDRANGES_BY_STREAM.retain(spec.streamName());
+ shardRanges.refreshPeriodically(kinesis, aggSpec::nextShardRefresh);
+ } else {
+ shardRanges = ShardRanges.EMPTY;
Review comment:
Noop shardranges if using `ExplicitPartitioner` or if refresh is disabled
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]