This is an automated email from the ASF dual-hosted git repository. emkornfield pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new dd4532a ARROW-6199: [Java] Avro adapter avoid potential resource leak. dd4532a is described below commit dd4532a0cdaccf8e7811086bc5360b13ef9a6c36 Author: tianchen <niki...@alibaba-inc.com> AuthorDate: Thu Aug 15 19:49:53 2019 -0700 ARROW-6199: [Java] Avro adapter avoid potential resource leak. Related to [ARROW-6199](https://issues.apache.org/jira/browse/ARROW-6199). Currently, avro consumer interface has no close API, which may cause resource leak like AvroBytesConsumer#cacheBuffer. To resolve this, make consumer extends AutoCloseable and create CompositeAvroConsumer to encompasses consume and close logic. Closes #5059 from tianchen92/ARROW-6199 and squashes the following commits: d60d94c48 <tianchen> fix 42f22da7c <tianchen> clear vectors in close 5b91da75f <tianchen> fix comments 3ffc07600 <tianchen> ARROW-6199: Avro adapter avoid potential resource leak. Authored-by: tianchen <niki...@alibaba-inc.com> Signed-off-by: Micah Kornfield <emkornfi...@gmail.com> --- .../java/org/apache/arrow/AvroToArrowUtils.java | 22 +++---- .../arrow/consumers/AvroBooleanConsumer.java | 5 ++ .../apache/arrow/consumers/AvroBytesConsumer.java | 5 ++ .../apache/arrow/consumers/AvroDoubleConsumer.java | 5 ++ .../apache/arrow/consumers/AvroFloatConsumer.java | 5 ++ .../apache/arrow/consumers/AvroIntConsumer.java | 5 ++ .../apache/arrow/consumers/AvroLongConsumer.java | 5 ++ .../apache/arrow/consumers/AvroNullConsumer.java | 5 ++ .../apache/arrow/consumers/AvroStringConsumer.java | 5 ++ .../apache/arrow/consumers/AvroUnionsConsumer.java | 16 +++-- .../arrow/consumers/CompositeAvroConsumer.java | 69 ++++++++++++++++++++++ .../java/org/apache/arrow/consumers/Consumer.java | 7 ++- .../arrow/consumers/NullableTypeConsumer.java | 5 ++ 13 files changed, 141 insertions(+), 18 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 25611a5..77f34df 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -20,7 +20,6 @@ package org.apache.arrow; import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; -import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -37,6 +36,7 @@ import org.apache.arrow.consumers.AvroLongConsumer; import org.apache.arrow.consumers.AvroNullConsumer; import org.apache.arrow.consumers.AvroStringConsumer; import org.apache.arrow.consumers.AvroUnionsConsumer; +import org.apache.arrow.consumers.CompositeAvroConsumer; import org.apache.arrow.consumers.Consumer; import org.apache.arrow.consumers.NullableTypeConsumer; import org.apache.arrow.memory.BufferAllocator; @@ -246,19 +246,15 @@ public class AvroToArrowUtils { VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0); - int valueCount = 0; - while (true) { - try { - for (Consumer consumer : consumers) { - consumer.consume(decoder); - } - valueCount++; - //reach end will throw EOFException. - } catch (EOFException eofException) { - root.setRowCount(valueCount); - break; - } + CompositeAvroConsumer compositeConsumer = null; + try { + compositeConsumer = new CompositeAvroConsumer(consumers); + compositeConsumer.consume(decoder, root); + } catch (Exception e) { + compositeConsumer.close(); + throw new RuntimeException("Error occurs while consume process.", e); } + return root; } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java index b2fe704..c2876f1 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java @@ -63,4 +63,9 @@ public class AvroBooleanConsumer implements Consumer { return this.vector; } + @Override + public void close() throws Exception { + writer.close(); + } + } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java index 2c649f9..c0cfaec 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java @@ -79,4 +79,9 @@ public class AvroBytesConsumer implements Consumer { public FieldVector getVector() { return vector; } + + @Override + public void close() throws Exception { + writer.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java index 63b2071..6538831 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java @@ -62,4 +62,9 @@ public class AvroDoubleConsumer implements Consumer { public FieldVector getVector() { return vector; } + + @Override + public void close() throws Exception { + writer.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java index ea752e2..6256a9a 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java @@ -62,4 +62,9 @@ public class AvroFloatConsumer implements Consumer { public FieldVector getVector() { return this.vector; } + + @Override + public void close() throws Exception { + writer.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java index ab830bc..854c8d0 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java @@ -62,4 +62,9 @@ public class AvroIntConsumer implements Consumer { public FieldVector getVector() { return this.vector; } + + @Override + public void close() throws Exception { + writer.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java index 68acb94..e0095cc 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java @@ -62,4 +62,9 @@ public class AvroLongConsumer implements Consumer { public FieldVector getVector() { return this.vector; } + + @Override + public void close() throws Exception { + writer.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java index d06e2f5..1e32419 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java @@ -48,4 +48,9 @@ public class AvroNullConsumer implements Consumer { public FieldVector getVector() { return this.vector; } + + @Override + public void close() { + vector.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java index 1719bf7..850d699 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -80,4 +80,9 @@ public class AvroStringConsumer implements Consumer { public FieldVector getVector() { return this.vector; } + + @Override + public void close() throws Exception { + writer.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java index 5277678..b927a5b 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java @@ -31,7 +31,7 @@ import org.apache.avro.io.Decoder; */ public class AvroUnionsConsumer implements Consumer { - private Consumer[] indexDelegates; + private Consumer[] delegates; private Types.MinorType[] types; private UnionWriter writer; @@ -40,11 +40,11 @@ public class AvroUnionsConsumer implements Consumer { /** * Instantiate a AvroUnionConsumer. */ - public AvroUnionsConsumer(UnionVector vector, Consumer[] indexDelegates, Types.MinorType[] types) { + public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) { this.writer = new UnionWriter(vector); this.vector = vector; - this.indexDelegates = indexDelegates; + this.delegates = delegates; this.types = types; } @@ -53,7 +53,7 @@ public class AvroUnionsConsumer implements Consumer { int fieldIndex = decoder.readInt(); int position = writer.getPosition(); - Consumer delegate = indexDelegates[fieldIndex]; + Consumer delegate = delegates[fieldIndex]; vector.setType(position, types[fieldIndex]); // In UnionVector we need to set sub vector writer position before consume a value @@ -80,4 +80,12 @@ public class AvroUnionsConsumer implements Consumer { vector.setValueCount(writer.getPosition()); return this.vector; } + + @Override + public void close() throws Exception { + writer.close(); + for (Consumer delegate: delegates) { + delegate.close(); + } + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java new file mode 100644 index 0000000..0f4b3c3 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.io.Decoder; + +/** + * Composite consumer which hold all consumers. + * It manages the consume and cleanup process. + */ +public class CompositeAvroConsumer implements AutoCloseable { + + private final List<Consumer> consumers; + + public CompositeAvroConsumer(List<Consumer> consumers) { + this.consumers = consumers; + } + + /** + * Consume decoder data and write into {@link VectorSchemaRoot}. + */ + public void consume(Decoder decoder, VectorSchemaRoot root) throws IOException { + int valueCount = 0; + while (true) { + try { + for (Consumer consumer : consumers) { + consumer.consume(decoder); + } + valueCount++; + //reach end will throw EOFException. + } catch (EOFException eofException) { + root.setRowCount(valueCount); + break; + } + } + } + + @Override + public void close() { + // clean up + for (Consumer consumer : consumers) { + try { + consumer.close(); + } catch (Exception e) { + throw new RuntimeException("Error occurs in close.", e); + } + } + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java index c3a543c..be318f6 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java @@ -25,7 +25,7 @@ import org.apache.avro.io.Decoder; /** * Interface that is used to consume values from avro decoder. */ -public interface Consumer { +public interface Consumer extends AutoCloseable { /** * Consume a specific type value from avro decoder and write it to vector. @@ -48,4 +48,9 @@ public interface Consumer { * Get the vector within the consumer. */ FieldVector getVector(); + + /** + * Close this consumer when occurs exception to avoid potential leak. + */ + void close() throws Exception; } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java index 5ac7bd7..05216b1 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java @@ -65,4 +65,9 @@ public class NullableTypeConsumer implements Consumer { return delegate.getVector(); } + @Override + public void close() throws Exception { + delegate.close(); + } + }