jsun98 edited a comment on issue #6431: Add Kinesis Indexing Service to core 
Druid
URL: https://github.com/apache/incubator-druid/pull/6431#issuecomment-428273955
 
 
   @b-slim thanks for the comments. The motivation for creating the 
SeekableStream abstraction is for seekable order partition-able streams, like 
Kafka and Kinesis, I should have been more specific in the pr description and 
naming the classes. 
   
   Regarding Pulsar, after digging into the docs for a bit, it seems that 
Pulsar is 
[partition-able](https://pulsar.apache.org/docs/en/concepts-messaging/#partitioned-topics),
 but does not guarantee ordering within the partitions when [shared 
subscription 
mode](https://pulsar.apache.org/docs/en/concepts-messaging/#subscription-modes) 
is used on the consumer side. So I think Pulsar and other systems like it do 
not fit the characteristics / share enough similarities with Kafka/Kinesis 
indexing service to be grouped under the same abstraction. In my understanding 
the Kafka indexing service is designed to support types of systems with these 
characteristics:
   - ability to seek to a specific point in the stream - so that we can store 
the offsets transactionally in an external system to ensure correctness and can 
return to a specific record if we need to retry after a failure
   - partitionable - so that ingestion can be scaled out and handled by 
multiple indexing tasks
   - partition-ordered - so that we have deterministic results from replica 
tasks that are independently reading the same partition
   
   
   As for the `Record` class, it should probably be named something like 
`OrderedPartitionableRecord` as its purpose was not to serve as a generic 
record for any streaming task. The reason why it contains an order list of 
`List<byte[]> data` is because in our current Kafka supervisor implementation, 
a record's value is an array of bytes, as in 
`IncrementalPublishingKafkaIndexTaskRunner`
   ```java
   final byte[] valueBytes = record.value();
   final List<InputRow> rows = valueBytes == null
           ? Utils.nullableListOf((InputRow) null)
           : parser.parseBatch(ByteBuffer.wrap(valueBytes));
   ```
   and in `KinesisRecordSupplier`, when `deaggregate` is configured, 
`UserRecord.deaggregate()` returns a `List<UserRecord>`, which, is likely to 
maintain the order in which the messages were inputted into the aggregator
   ```java
   if (deaggregate) {
       data = new ArrayList<>();
   
       final List<UserRecord> userRecords = 
UserRecord.deaggregate(Collections.singletonList(kinesisRecord));
       for (UserRecord userRecord : userRecords) {
             data.add(toByteArray(userRecord.getData()));
       }
   } else {
       data = Collections.singletonList(toByteArray(kinesisRecord.getData()));
   }
   ```
   
   I agree that `<T1, T2>` should be changed to something more descriptive like 
`<partitionType, sequenceType>`
   
   I will update the pr description to include more details, and address the 
code reviews individually.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to