lidavidm commented on code in PR #638:
URL: https://github.com/apache/arrow-java/pull/638#discussion_r1970974511


##########
adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/Producer.java:
##########


Review Comment:
   nit: AvroProducer since everything else is named like that?



##########
adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/CompositeAvroProducer.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.avro.producers;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.avro.io.Encoder;
+
+/** Composite producer which holds all producers. It manages the produce and 
cleanup process. */
+public class CompositeAvroProducer implements AutoCloseable {
+
+  private final List<Producer<? extends FieldVector>> producers;
+
+  public CompositeAvroProducer(List<Producer<? extends FieldVector>> 
producers) {
+    this.producers = producers;
+  }
+
+  public List<Producer<?>> getProducers() {
+    return producers;
+  }
+
+  /** Produce encoder data. */
+  public void produce(Encoder encoder) throws IOException {
+    for (Producer<? extends FieldVector> producer : producers) {
+      producer.produce(encoder);
+    }
+  }
+
+  /** Reset vector of consumers with the given {@link VectorSchemaRoot}. */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  public void resetProducerVectors(VectorSchemaRoot root) {
+    // This method assumes that the VSR matches the constructed set of 
producers
+    int index = 0;
+    for (Producer producer : producers) {
+      if (producer.resetValueVector(root.getFieldVectors().get(index))) {
+        index++;
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    // clean up
+    try {
+      AutoCloseables.close(producers);
+    } catch (Exception e) {
+      throw new RuntimeException("Error occurs in close.", e);

Review Comment:
   Normally `close` is declared to throw `Exception` anyways - do we need to 
rewrap this in RuntimeException?
   
   Otherwise use `AutoCloseables.closeNoChecked`



##########
adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/Producer.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adapter.avro.producers;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.avro.io.Encoder;
+
+import java.io.IOException;
+
+/**
+ * Interface that is used to produce values to avro encoder.
+ *
+ * @param <T> The vector within producer or its delegate, used for partially 
produce purpose.
+ */
+public interface Producer<T extends FieldVector> extends AutoCloseable {
+
+  /**
+   * Produce a specific type value from the vector and write it to avro 
encoder.
+   *
+   * @param encoder avro encoder to write data
+   * @throws IOException on error
+   */
+  void produce(Encoder encoder) throws IOException;
+
+  /** Skip null value in the vector by setting reader position + 1. */
+  void skipNull();
+
+  /** Set the position to read value from vector. */
+  void setPosition(int index);

Review Comment:
   I think we've been trying to be future-proof and use `long` for indices now 
(since eventually once we support the new memory APIs we'll be able to address 
more than 2GiB of memory at once)



##########
adapter/avro/src/main/java/org/apache/arrow/adapter/avro/producers/BaseAvroProducer.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adapter.avro.producers;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * Base class for avro producers.
+ *
+ * @param <T> vector type.
+ */
+public abstract class BaseAvroProducer<T extends FieldVector> implements 
Producer<T> {
+
+  protected T vector;
+  protected int currentIndex;
+
+  /**
+   * Constructs a base avro consumer.
+   *
+   * @param vector the vector to consume.
+   */
+  protected BaseAvroProducer(T vector) {
+    this.vector = vector;
+  }
+
+  @Override
+  public void skipNull() {
+    currentIndex++;
+  }
+
+  @Override
+  public void setPosition(int index) {
+    currentIndex = index;
+  }
+
+  @Override
+  public void close() throws Exception {
+    vector.close();
+  }
+
+  @Override
+  public boolean resetValueVector(T vector) {
+    this.vector = vector;
+    this.currentIndex = 0;
+    return true;
+  }

Review Comment:
   Hmm. Should resetting close the prior vector? If not, then maybe we also 
shouldn't be closing the vector here (i.e. it's the upper level code's 
responsibility to manage the vector lifetime and the producer is just 
temporarily holding the vector, not owning it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to