jsun98 edited a comment on issue #6431: Add Kinesis Indexing Service to core 
Druid
URL: https://github.com/apache/incubator-druid/pull/6431#issuecomment-428273955
 
 
   @b-slim thanks for the comments. The motivation for creating the 
SeekableStream abstraction is for seekable order partition-able streams, like 
Kafka and Kinesis, I should have been more specific in the pr description and 
naming the classes. 
   
   ~~Regarding Pulsar, after digging into the docs for a bit, it seems that 
Pulsar is 
[partition-able](https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics),
 but does not guarantee ordering within the partitions when [shared 
subscription 
mode](https://pulsar.apache.org/docs/en/concepts-messaging/#subscription-modes) 
is used on the consumer side. So I think Pulsar and other systems like it do 
not fit the characteristics / share enough similarities with Kafka/Kinesis 
indexing service to be grouped under the same abstraction.~~
   
   Seems like Pulsar supports partitions and offer strict ordering within each 
partition if failover subscription mode is used, so it should fit into the 
current abstraction.
   
   As for the `Record` class, it should probably be named something like 
`OrderedPartitionableRecord` as its purpose was not to serve as a generic 
record for any streaming task. The reason why it contains an order list of 
`List<byte[]> data` is because in our current Kafka supervisor implementation, 
a record's value is an array of bytes, as in 
`IncrementalPublishingKafkaIndexTaskRunner`
   ```java
   final byte[] valueBytes = record.value();
   final List<InputRow> rows = valueBytes == null
           ? Utils.nullableListOf((InputRow) null)
           : parser.parseBatch(ByteBuffer.wrap(valueBytes));
   ```
   and in `KinesisRecordSupplier`, when `deaggregate` is configured, 
`UserRecord.deaggregate()` returns a `List<UserRecord>`, which, is likely to 
maintain the order in which the messages were inputted into the aggregator
   ```java
   if (deaggregate) {
       data = new ArrayList<>();
   
       final List<UserRecord> userRecords = 
UserRecord.deaggregate(Collections.singletonList(kinesisRecord));
       for (UserRecord userRecord : userRecords) {
             data.add(toByteArray(userRecord.getData()));
       }
   } else {
       data = Collections.singletonList(toByteArray(kinesisRecord.getData()));
   }
   ```
   
   I agree that `<T1, T2>` should be changed to something more descriptive like 
`<partitionType, sequenceType>`
   
   I will update the pr description to include more details, and address the 
code reviews individually.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to