jcmcote commented on a change in pull request #1500: DRILL-6820: Msgpack format
reader
URL: https://github.com/apache/drill/pull/1500#discussion_r231340882
##########
File path:
contrib/format-msgpack/src/main/java/org/apache/drill/exec/store/msgpack/MsgpackSchema.java
##########
@@ -0,0 +1,114 @@
+package org.apache.drill.exec.store.msgpack;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.exception.SchemaChangeRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField.Builder;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+
+import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
+
+public class MsgpackSchema {
+ public static final String SCHEMA_FILE_NAME = ".schema.proto";
+
+ @SuppressWarnings("unused")
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MsgpackSchema.class);
+
+ private DrillFileSystem fileSystem;
+
+ public MsgpackSchema(DrillFileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ }
+
+ public MaterializedField load(Path schemaLocation) throws
AccessControlException, FileNotFoundException, IOException {
+ MaterializedField previousMapField = null;
+ if (schemaLocation != null && fileSystem.exists(schemaLocation)) {
+ try (FSDataInputStream in = fileSystem.open(schemaLocation)) {
+ String schemaData = IOUtils.toString(in);
+ Builder newBuilder = SerializedField.newBuilder();
+ try {
+ TextFormat.merge(schemaData, newBuilder);
+ } catch (ParseException e) {
+ throw new DrillRuntimeException("Failed to read schema file: " +
schemaLocation, e);
+ }
+ SerializedField read = newBuilder.build();
+ previousMapField = MaterializedField.create(read);
+ }
+ }
+ return previousMapField;
+ }
+
+ public void save(MaterializedField mapField, Path schemaLocation) throws
IOException {
+ try (FSDataOutputStream out = fileSystem.create(schemaLocation, true)) {
+ SerializedField serializedMapField = mapField.getSerializedField();
+ String data = TextFormat.printToString(serializedMapField);
+ IOUtils.write(data, out);
+ }
+ }
+
+ public MaterializedField merge(MaterializedField existingField,
MaterializedField newField) {
Review comment:
I don't handle multi-files in parallel right now.
Schema evolution is also not handled. You will recall from my questions in
the dev mailing list, I have come across cases where an array would have
VARCHAR and VARBINARY. In that case I would get a schema with array of type
VARCHAR (having skipped over the error of not being able to write VARBINARY
into that array.
By looking at the logs I can see these values are being skipped. I then
manually update the schema to tell it I want that array to be VARBINARY. From
now one when I read the VARCHAR values are coerced into VARBINARY.
So for now I use the schema learning to discover the bulk of the structure
and fix any edge cases manually.
But I think the bases are there to build a smarter discovery mechanism where
it could evolve the schema as it sees more of the data.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services