This is an automated email from the ASF dual-hosted git repository.
austin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aebd8780724 [YAML] - Pick file descriptor based on messageName (#30314)
aebd8780724 is described below
commit aebd878072455d40d1bd1b302e0fc60d2f9df573
Author: Ferran Fernández Garrido <[email protected]>
AuthorDate: Thu Feb 15 16:58:44 2024 +0100
[YAML] - Pick file descriptor based on messageName (#30314)
---
.../sdk/extensions/protobuf/ProtoByteUtils.java | 35 ++++++++++++++++++----
1 file changed, 29 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
index dd73739246d..6d048a088b7 100644
---
a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
+++
b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoByteUtils.java
@@ -36,6 +36,7 @@ import java.io.InputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -72,7 +73,7 @@ public class ProtoByteUtils {
* @return The Beam Schema representing the Protocol Buffer message.
*/
public static Schema getBeamSchemaFromProto(String fileDescriptorPath,
String messageName) {
- ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath);
+ ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath, messageName);
ProtoDomain protoDomain = dpd.getProtoDomain();
return ProtoDynamicMessageSchema.forDescriptor(protoDomain,
messageName).getSchema();
}
@@ -146,7 +147,7 @@ public class ProtoByteUtils {
public static SerializableFunction<byte[], Row> getProtoBytesToRowFunction(
String fileDescriptorPath, String messageName) {
- ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath);
+ ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath,
messageName);
ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain();
@SuppressWarnings("unchecked")
ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema =
@@ -192,7 +193,7 @@ public class ProtoByteUtils {
public static SerializableFunction<Row, byte[]> getRowToProtoBytes(
String fileDescriptorPath, String messageName) {
- ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath);
+ ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath,
messageName);
ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain();
@SuppressWarnings("unchecked")
ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema =
@@ -213,16 +214,38 @@ public class ProtoByteUtils {
* file.
*
* @param fileDescriptorPath The path to the File Descriptor Set file.
+ * @param messageName The name of the message type for which the descriptor
is desired.
* @return ProtoSchemaInfo containing the associated ProtoDomain and File
Name.
* @throws RuntimeException if an error occurs during schema retrieval.
*/
- private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath) {
+ private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath,
String messageName) {
byte[] from = getFileAsBytes(fileDescriptorPath);
try {
+ List<String> messageElements = Splitter.on('.').splitToList(messageName);
+ String messageTypeByName = messageElements.get(messageElements.size() -
1);
+
DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(from);
- return new ProtoSchemaInfo(
- descriptorSet.getFile(0).getName(),
ProtoDomain.buildFrom(descriptorSet));
+
+ ProtoDomain protoDomain = ProtoDomain.buildFrom(descriptorSet);
+ List<String> fileProtoNames = new ArrayList<>();
+
+ descriptorSet
+ .getFileList()
+ .forEach(fileDescriptorProto ->
fileProtoNames.add(fileDescriptorProto.getName()));
+
+ String fullName =
+ fileProtoNames.stream()
+ .filter(
+ name ->
+
protoDomain.getFileDescriptor(name).findMessageTypeByName(messageTypeByName)
+ != null)
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new NullPointerException("Couldn't locate the proto for
that message name"));
+
+ return new ProtoSchemaInfo(fullName, protoDomain);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}