clintropolis commented on a change in pull request #10839: URL: https://github.com/apache/druid/pull/10839#discussion_r570892484
########## File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/FileBasedProtobufBytesDecoder.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.protobuf.dynamic.DynamicSchema; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.Set; + +public class FileBasedProtobufBytesDecoder implements ProtobufBytesDecoder +{ + private final String descriptorFilePath; + private final String protoMessageType; + private Descriptors.Descriptor descriptor; + + + @JsonCreator + public FileBasedProtobufBytesDecoder( + @JsonProperty("descriptor") String descriptorFilePath, + @JsonProperty("protoMessageType") String protoMessageType + ) + { + this.descriptorFilePath = descriptorFilePath; + this.protoMessageType = protoMessageType; + initDescriptor(); + } + + @VisibleForTesting + void initDescriptor() + { + if (this.descriptor == null) { + this.descriptor = getDescriptor(descriptorFilePath); + } + } + + @SuppressWarnings("checkstyle:RightCurly") Review comment: what is this suppression for? ########## File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java ########## @@ -37,39 +32,32 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.ParseSpec; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.Parser; import org.apache.druid.utils.CollectionUtils; -import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Set; public class ProtobufInputRowParser implements ByteBufferInputRowParser { + private static final Logger LOG = new Logger(ByteBufferInputRowParser.class); + private final ParseSpec parseSpec; - private final String descriptorFilePath; - private final String protoMessageType; - private Descriptor descriptor; + private final ProtobufBytesDecoder protobufBytesDecoder; private Parser<String, Object> parser; private final List<String> dimensions; @JsonCreator public ProtobufInputRowParser( @JsonProperty("parseSpec") ParseSpec parseSpec, - @JsonProperty("descriptor") String descriptorFilePath, - @JsonProperty("protoMessageType") String protoMessageType + @JsonProperty("protoBytesDecoder") ProtobufBytesDecoder protobufBytesDecoder Review comment: So that this change is not incompatible with previous druid versions, I think we should probably consider leaving the old constructor properties in place and marking them as deprecated instead of removing them, and then enforce that only one of `protoBytesDecoder` or `descriptor`/`protoMessageType` can be set, and then automatically construct a 'FileBasedProtobufBytesDecoder' if the old properties are set and the new is null. Then in a future version we can consider dropping them ########## File path: distribution/bin/check-licenses.py ########## @@ -225,6 +225,9 @@ def build_compatible_license_names(): compatible_licenses['Apache License v2.0'] = 'Apache License version 2.0' compatible_licenses['Apache License, version 2.0'] = 'Apache License version 2.0' compatible_licenses['Apache 2.0 License'] = 'Apache License version 2.0' + compatible_licenses['Apache License, 2.0'] = 'Apache License version 2.0' + compatible_licenses['Confluent Community License'] = 'Confluent Community License' Review comment: hmm, what requires the confluent community license? I don't see anything in licenses.yaml mentioning it. Last i knew, the clients to confluent things were still apache licensed (i'm not sure we can distribute anything with this license in Druid packages, would have to check into that, but I suspect not since it has field of use restrictions) ########## File path: extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/SchemaRegistryBasedProtobufBytesDecoder.java ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.ParseException; + +import java.nio.ByteBuffer; +import java.util.Collections; + +public class SchemaRegistryBasedProtobufBytesDecoder implements ProtobufBytesDecoder +{ + + private static final Logger LOGGER = new Logger(SchemaRegistryBasedProtobufBytesDecoder.class); + + private final SchemaRegistryClient registry; + private int identityMapCapacity; + + @JsonCreator + public SchemaRegistryBasedProtobufBytesDecoder( + @JsonProperty("url") String url, + @JsonProperty("capacity") Integer capacity + ) + { + this.identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity; + registry = new CachedSchemaRegistryClient(Collections.singletonList(url), identityMapCapacity, Collections.singletonList(new ProtobufSchemaProvider()), null); + } + + @VisibleForTesting + int getIdentityMapCapacity() + { + return this.identityMapCapacity; + } + + @VisibleForTesting + SchemaRegistryBasedProtobufBytesDecoder(SchemaRegistryClient registry) + { + this.registry = registry; + } + + @Override + public DynamicMessage parse(ByteBuffer bytes) + { + try { + bytes.get(); // ignore first \0 byte + int id = bytes.getInt(); // extract schema registry id + bytes.get(); // ignore \0 byte before PB message + int length = bytes.limit() - 2 - 4; + ProtobufSchema schema = (ProtobufSchema) registry.getSchemaById(id); + Descriptors.Descriptor descriptor = schema.toDescriptor(); + byte[] rawMessage = new byte[length]; + bytes.get(rawMessage, 0, length); + DynamicMessage message = DynamicMessage.parseFrom(descriptor, rawMessage); + return message; + } + catch (Exception e) { Review comment: maybe this "try" should block be split up error handling so that error messaging for failing to fetch the schema from the registry, which doesn't exactly seem like a parse exception, can be distinguished from an actual parse exception ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
