This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bbec0a8070 Handle unknown magic byte error in Confluent Avro decoder
(#9045) (#9051)
bbec0a8070 is described below
commit bbec0a8070ce1d689a382195b291d68ca945c040
Author: Daniel del Castillo <[email protected]>
AuthorDate: Sun Jul 24 16:49:21 2022 +0100
Handle unknown magic byte error in Confluent Avro decoder (#9045) (#9051)
* Handle unknown magic byte error in Confluent Avro decoder (#9045)
* Update StreamMessageDecoder documentation
---
pinot-integration-tests/pom.xml | 25 ++
...ssageDecoderRealtimeClusterIntegrationTest.java | 313 +++++++++++++++++++++
.../schemaregistry/SchemaRegistryStarter.java | 105 +++++++
...aConfluentSchemaRegistryAvroMessageDecoder.java | 33 ++-
.../pinot/spi/stream/StreamMessageDecoder.java | 9 +-
5 files changed, 481 insertions(+), 4 deletions(-)
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index a67a222d5b..3da652cff0 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -37,6 +37,7 @@
<localstack-utils.version>0.2.19</localstack-utils.version>
<awaitility.version>3.0.0</awaitility.version>
<aws.sdk.version>2.14.28</aws.sdk.version>
+ <testcontainers.version>1.17.3</testcontainers.version>
</properties>
<build>
@@ -323,6 +324,30 @@
<artifactId>pinot-yammer</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-confluent-avro</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>net.java.dev.jna</groupId>
+ <artifactId>jna</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>cloud.localstack</groupId>
<artifactId>localstack-utils</artifactId>
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
new file mode 100644
index 0000000000..21627741f1
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.primitives.Longs;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.controller.ControllerConf;
+import
org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test that extends RealtimeClusterIntegrationTest but uses
low-level Kafka consumer.
+ * TODO: Add separate module-level tests and remove the randomness of this test
+ */
+public class
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest
+ extends RealtimeClusterIntegrationTest {
+ private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+ private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
+ "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+ private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
Collections.singletonList("DivActualElapsedTime");
+ private static final long RANDOM_SEED = System.currentTimeMillis();
+ private static final Random RANDOM = new Random(RANDOM_SEED);
+
+ private final boolean _isDirectAlloc = RANDOM.nextBoolean();
+ private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+ private final boolean _enableSplitCommit = RANDOM.nextBoolean();
+ private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+ private final long _startTime = System.currentTimeMillis();
+ private SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry;
+
+ @Override
+ protected int getNumKafkaBrokers() {
+ return 1;
+ }
+
+ @Override
+ protected void startKafka() {
+ super.startKafka();
+ startSchemaRegistry();
+ }
+
+ @Override
+ protected void stopKafka() {
+ stopSchemaRegistry();
+ super.stopKafka();
+ }
+
+ private void startSchemaRegistry() {
+ if (_schemaRegistry == null) {
+ _schemaRegistry =
SchemaRegistryStarter.startLocalInstance(SchemaRegistryStarter.DEFAULT_PORT);
+ }
+ }
+
+ private void stopSchemaRegistry() {
+ try {
+ if (_schemaRegistry != null) {
+ _schemaRegistry.stop();
+ _schemaRegistry = null;
+ }
+ } catch (Exception e) {
+ // Swallow exceptions
+ }
+ }
+
+ @Override
+ protected void pushAvroIntoKafka(List<File> avroFiles)
+ throws Exception {
+ Properties avroProducerProps = new Properties();
+ avroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:" + getKafkaPort());
+
avroProducerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
_schemaRegistry.getUrl());
+ avroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ avroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "io.confluent.kafka.serializers.KafkaAvroSerializer");
+ Producer<byte[], GenericRecord> avroProducer = new
KafkaProducer<>(avroProducerProps);
+
+ Properties nonAvroProducerProps = new Properties();
+ nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:" + getKafkaPort());
+ nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ Producer<byte[], byte[]> nonAvroProducer = new
KafkaProducer<>(nonAvroProducerProps);
+
+ if (injectTombstones()) {
+ // publish lots of tombstones to livelock the consumer if it can't
handle this properly
+ for (int i = 0; i < 1000; i++) {
+ // publish a tombstone first
+ nonAvroProducer.send(
+ new ProducerRecord<>(getKafkaTopic(),
Longs.toByteArray(System.currentTimeMillis()), null));
+ }
+ }
+ for (File avroFile : avroFiles) {
+ try (DataFileStream<GenericRecord> reader =
AvroUtils.getAvroReader(avroFile)) {
+ for (GenericRecord genericRecord : reader) {
+ byte[] keyBytes = (getPartitionColumn() == null) ?
Longs.toByteArray(System.currentTimeMillis())
+ :
(genericRecord.get(getPartitionColumn())).toString().getBytes();
+ // Ignore getKafkaMessageHeader()
+ nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes,
"Rubbish".getBytes(UTF_8)));
+ avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes,
genericRecord));
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Map<String, String> getStreamConfigs() {
+ Map<String, String> streamConfigMap = super.getStreamConfigs();
+ String streamType = "kafka";
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_DECODER_CLASS),
+ KafkaConfluentSchemaRegistryAvroMessageDecoder.class.getName());
+ streamConfigMap.put("stream.kafka.decoder.prop.schema.registry.rest.url",
_schemaRegistry.getUrl());
+ return streamConfigMap;
+ }
+
+ @Override
+ protected boolean injectTombstones() {
+ return true;
+ }
+
+ @Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ @Override
+ protected String getLoadMode() {
+ return ReadMode.mmap.name();
+ }
+
+ @Override
+ public void startController()
+ throws Exception {
+ Map<String, Object> properties = getDefaultControllerConfiguration();
+
+ properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
+ properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit);
+
+ startController(properties);
+
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration configuration) {
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
true);
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION,
_isDirectAlloc);
+ if (_isConsumerDirConfigured) {
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR,
CONSUMER_DIRECTORY);
+ }
+ if (_enableSplitCommit) {
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT,
true);
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA,
true);
+ }
+ }
+
+ @Override
+ protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema,
TableConfig tableConfig)
+ throws Exception {
+ if (!_tarDir.exists()) {
+ _tarDir.mkdir();
+ }
+ if (!_segmentDir.exists()) {
+ _segmentDir.mkdir();
+ }
+
+ // create segments out of the avro files (segments will be placed in
_tarDir)
+ List<File> copyOfAvroFiles = new ArrayList<>(avroFiles);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles,
tableConfig, schema, 0, _segmentDir, _tarDir);
+
+ // upload segments to controller
+ uploadSegmentsToController(getTableName(), _tarDir, false, false);
+
+ // upload the first segment again to verify refresh
+ uploadSegmentsToController(getTableName(), _tarDir, true, false);
+
+ // upload the first segment again to verify refresh with different segment
crc
+ uploadSegmentsToController(getTableName(), _tarDir, true, true);
+
+ // add avro files to the original list so H2 will have the uploaded data
as well
+ avroFiles.addAll(copyOfAvroFiles);
+ }
+
+ private void uploadSegmentsToController(String tableName, File tarDir,
boolean onlyFirstSegment, boolean changeCrc)
+ throws Exception {
+ File[] segmentTarFiles = tarDir.listFiles();
+ assertNotNull(segmentTarFiles);
+ int numSegments = segmentTarFiles.length;
+ assertTrue(numSegments > 0);
+ if (onlyFirstSegment) {
+ numSegments = 1;
+ }
+ URI uploadSegmentHttpURI =
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+ try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
+ if (numSegments == 1) {
+ File segmentTarFile = segmentTarFiles[0];
+ if (changeCrc) {
+ changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString());
+ }
+ assertEquals(
+ fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(), segmentTarFile,
+ tableName, TableType.REALTIME).getStatusCode(),
HttpStatus.SC_OK);
+ } else {
+ // Upload segments in parallel
+ ExecutorService executorService =
Executors.newFixedThreadPool(numSegments);
+ List<Future<Integer>> futures = new ArrayList<>(numSegments);
+ for (File segmentTarFile : segmentTarFiles) {
+ futures.add(executorService.submit(
+ () ->
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(),
+ segmentTarFile, tableName,
TableType.REALTIME).getStatusCode()));
+ }
+ executorService.shutdown();
+ for (Future<Integer> future : futures) {
+ assertEquals((int) future.get(), HttpStatus.SC_OK);
+ }
+ }
+ }
+ }
+
+ private void changeCrcInSegmentZKMetadata(String tableName, String
segmentFilePath) {
+ int startIdx = segmentFilePath.indexOf("mytable_");
+ int endIdx = segmentFilePath.indexOf(".tar.gz");
+ String segmentName = segmentFilePath.substring(startIdx, endIdx);
+ String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+ SegmentZKMetadata segmentZKMetadata =
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
+ segmentZKMetadata.setCrc(111L);
+ _helixResourceManager.updateZkMetadata(tableNameWithType,
segmentZKMetadata);
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ // all the data that was ingested from Kafka also got uploaded via the
controller's upload endpoint
+ return super.getCountStarResult() * 2;
+ }
+
+ @BeforeClass
+ @Override
+ public void setUp()
+ throws Exception {
+ System.out.println(format(
+ "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured:
%s, enableSplitCommit: %s, "
+ + "enableLeadControllerResource: %s", RANDOM_SEED, _isDirectAlloc,
_isConsumerDirConfigured,
+ _enableSplitCommit, _enableLeadControllerResource));
+
+ // Remove the consumer directory
+ FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
+
+ super.setUp();
+ }
+
+ @AfterClass
+ @Override
+ public void tearDown()
+ throws Exception {
+ FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
+ super.tearDown();
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java
new file mode 100644
index 0000000000..9d5c0ab53b
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.kafka.schemaregistry;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+
+public class SchemaRegistryStarter {
+ public static final int DEFAULT_PORT = 8081;
+ private static final String CONFLUENT_PLATFORM_VERSION = "7.2.0";
+ private static final DockerImageName KAFKA_DOCKER_IMAGE_NAME =
+ DockerImageName.parse("confluentinc/cp-kafka:" +
CONFLUENT_PLATFORM_VERSION);
+ private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE_NAME =
+ DockerImageName.parse("confluentinc/cp-schema-registry:" +
CONFLUENT_PLATFORM_VERSION);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaRegistryStarter.class);
+
+ private SchemaRegistryStarter() {
+ }
+
+ public static KafkaSchemaRegistryInstance startLocalInstance(int port) {
+ KafkaSchemaRegistryInstance kafkaSchemaRegistry = new
KafkaSchemaRegistryInstance(port);
+ kafkaSchemaRegistry.start();
+ return kafkaSchemaRegistry;
+ }
+
+ public static class KafkaSchemaRegistryInstance {
+ private final int _port;
+ public KafkaContainer _kafkaContainer;
+ private Network _network;
+ private GenericContainer _schemaRegistryContainer;
+
+ private KafkaSchemaRegistryInstance(int port) {
+ _port = port;
+ }
+
+ public String getUrl() {
+ return "http://" + _schemaRegistryContainer.getHost() + ":" +
_schemaRegistryContainer.getMappedPort(_port);
+ }
+
+ public void start() {
+ LOGGER.info("Starting schema registry");
+ if (_kafkaContainer != null || _schemaRegistryContainer != null) {
+ throw new IllegalStateException("Schema registry is already running");
+ }
+
+ _network = Network.newNetwork();
+
+ _kafkaContainer = new
KafkaContainer(KAFKA_DOCKER_IMAGE_NAME).withNetwork(_network).withNetworkAliases("kafka")
+ .withCreateContainerCmdModifier(it ->
it.withHostName("kafka")).waitingFor(Wait.forListeningPort());
+ _kafkaContainer.start();
+
+ Map<String, String> schemaRegistryProps = new HashMap<>();
+ schemaRegistryProps.put("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
"kafka:9092");
+ schemaRegistryProps.put("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry");
+ schemaRegistryProps.put("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" +
_port);
+ schemaRegistryProps.put("SCHEMA_REGISTRY_DEBUG", "true");
+ _schemaRegistryContainer =
+ new
GenericContainer(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME).dependsOn(_kafkaContainer).withNetwork(_network)
+
.withNetworkAliases("schemaregistry").withEnv(schemaRegistryProps).withExposedPorts(_port)
+ .waitingFor(Wait.forListeningPort());
+ _schemaRegistryContainer.start();
+ }
+
+ public void stop() {
+ LOGGER.info("Stopping schema registry");
+ if (_schemaRegistryContainer != null) {
+ _schemaRegistryContainer.stop();
+ _schemaRegistryContainer = null;
+ }
+
+ if (_kafkaContainer != null) {
+ _kafkaContainer.stop();
+ _kafkaContainer = null;
+ }
+
+ if (_network != null) {
+ _network.close();
+ }
+ }
+ }
+}
diff --git
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
index 20c7981b44..f666d6401b 100644
---
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
+++
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
@@ -31,6 +31,7 @@ import org.apache.avro.generic.GenericData.Record;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractorConfig;
@@ -38,6 +39,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkState;
@@ -48,6 +51,7 @@ import static com.google.common.base.Preconditions.checkState;
* NOTE: Do not use schema in the implementation, as schema will be removed
from the params
*/
public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaConfluentSchemaRegistryAvroMessageDecoder.class);
private static final String SCHEMA_REGISTRY_REST_URL =
"schema.registry.rest.url";
private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry.";
private KafkaAvroDeserializer _deserializer;
@@ -103,12 +107,37 @@ public class
KafkaConfluentSchemaRegistryAvroMessageDecoder implements StreamMes
@Override
public GenericRow decode(byte[] payload, GenericRow destination) {
- Record avroRecord = (Record) _deserializer.deserialize(_topicName,
payload);
- return _avroRecordExtractor.extract(avroRecord, destination);
+ try {
+ Record avroRecord = (Record) _deserializer.deserialize(_topicName,
payload);
+ return _avroRecordExtractor.extract(avroRecord, destination);
+ } catch (RuntimeException e) {
+ ignoreOrRethrowException(e);
+ return null;
+ }
}
@Override
public GenericRow decode(byte[] payload, int offset, int length, GenericRow
destination) {
return decode(Arrays.copyOfRange(payload, offset, offset + length),
destination);
}
+
+ /**
+ * This method handles specific serialisation exceptions. If the exception
cannot be ignored the method
+ * re-throws the exception.
+ *
+ * @param e exception to handle
+ */
+ private void ignoreOrRethrowException(RuntimeException e) {
+ if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) {
+ // Do nothing, the message is not an Avro message and can't be decoded
+ LOGGER.error("Caught exception while decoding row in topic {},
discarding row", _topicName, e);
+ return;
+ }
+ throw e;
+ }
+
+ private boolean isUnknownMagicByte(Throwable e) {
+ return e != null && e instanceof SerializationException && e.getMessage()
!= null && e.getMessage().toLowerCase()
+ .contains("unknown magic byte");
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
index d2989c8ae8..89312f06b6 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -52,8 +53,10 @@ public interface StreamMessageDecoder<T> {
* Decodes a row.
*
* @param payload The buffer from which to read the row.
- * @return A new row decoded from the buffer
+ * @return A new row decoded from the buffer. If the returned value is
<code>null</code> the row is dropped from the
+ * segment.
*/
+ @Nullable
GenericRow decode(T payload, GenericRow destination);
/**
@@ -63,7 +66,9 @@ public interface StreamMessageDecoder<T> {
* @param offset The offset into the array from which the row contents starts
* @param length The length of the row contents in bytes
* @param destination The {@link GenericRow} to write the decoded row into
- * @return A new row decoded from the buffer
+ * @return A new row decoded from the buffer If the returned value is
<code>null</code> the row is dropped from the
+ * segment.
*/
+ @Nullable
GenericRow decode(T payload, int offset, int length, GenericRow destination);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]