clintropolis commented on code in PR #16813: URL: https://github.com/apache/druid/pull/16813#discussion_r1700904689
########## extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexing.kinesis.KinesisRecordSupplier; + +/** + * A {@link ByteEntity} generated by {@link KinesisRecordSupplier} and fed to any {@link InputFormat} used by kinesis + * indexing tasks. + * <p> + * It can be used as a regular ByteEntity, in which case the kinesis record value is returned, but the {@link #getRecord} + * method also allows Kinesis-aware {@link InputFormat} implementations to read the full kinesis record, including + * timestamp, encrytion key, patition key, and sequence number + * <p> + * NOTE: Any records with null values will be skipped, even if they contain non-null headers Review Comment: is this true? Looking at the reader, we return an event with just the "headers": ``` if (record.getRecord().getData() != null) { return buildBlendedRows(valueParser, mergedHeaderMap); } else { return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator()); } ``` ########## extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java: ########## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class KinesisInputReader implements InputEntityReader +{ + + private final InputRowSchema inputRowSchema; + private final SettableByteEntity<KinesisRecordEntity> source; + private final InputEntityReader valueParser; + private final String timestampColumnName; + + public KinesisInputReader( + InputRowSchema inputRowSchema, + SettableByteEntity<KinesisRecordEntity> source, + InputEntityReader valueParser, + String timestampColumnName + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.valueParser = valueParser; + this.timestampColumnName = timestampColumnName; + + } + + @Override + public CloseableIterator<InputRow> read() throws IOException + { + final KinesisRecordEntity record = source.getEntity(); + final Map<String, Object> mergedHeaderMap = extractHeaders(record); + + // Ignore tombstone records that have null values. Review Comment: this comment seems like we are still returning a row with the timestamp and nothing else instead of an empty iterator (kafka seems same) ########## extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.DateTimes; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +/** + * Kinesis aware InputFormat. Allows for reading kinesis specific values that are stored in the {@link Record}. At + * this time, this input format only supports reading the main record payload ({@link Record#data}) and + * {@link Record#approximateArrivalTimestamp}, but can be extended easily to read other fields. + */ +public class KinesisInputFormat implements InputFormat +{ + private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; + public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; + + // Since KinesisInputFormat blends data from record timestamp, and payload, timestamp spec can be pointing to an attribute within one of these + // 2 sections. To handle scenarios where there is no timestamp value either in payload or record timestamp, we induce an artifical timestamp value + // to avoid unnecessary parser barf out. Users in such situations can use the inputFormat's kinesis record timestamp as its primary timestamp. Review Comment: Is it really possible for there to be no timestamp in `record.getApproximateArrivalTimestamp`? This comment in kafka is because both value and keys use their own `InputFormat` to parse stuff before blending together. I think in this case only the value payload could potentially be missing a timestamp, it seems like the `record.getApproximateArrivalTimestamp` should always be there (the kafka comment is referring to using the kafka record timestamp to fill in place of a timestamp in place of either of those other 2 input formats not providing a timestamp). Also, nit, these comment lines are too long -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
