Repository: avro Updated Branches: refs/heads/master a7a43da6d -> 673261c86
AVRO-2109: Reset buffers in case of IOException Closes #260 Signed-off-by: Zoltan Ivanfi <[email protected]> Signed-off-by: sacharya <[email protected]> Signed-off-by: Nandor Kollar <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/avro/repo Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/673261c8 Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/673261c8 Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/673261c8 Branch: refs/heads/master Commit: 673261c8656124cc58bee65fe5e8c779350779ee Parents: a7a43da Author: Gabor Szadovszky <[email protected]> Authored: Tue Dec 5 19:37:11 2017 +0100 Committer: Gabor Szadovszky <[email protected]> Committed: Thu Jan 4 17:12:25 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/avro/file/DataFileWriter.java | 30 ++++++-- .../apache/avro/io/BufferedBinaryEncoder.java | 7 +- .../avro/file/TestIOExceptionDuringWrite.java | 79 ++++++++++++++++++++ 4 files changed, 108 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/avro/blob/673261c8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 04c4923..eb582cd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -107,6 +107,8 @@ Trunk (not yet released) AVRO-2117: Overall cleanup of java code (Niels Basjes) + AVRO-2109: Reset buffers in case of IOException (gabor) + BUG FIXES AVRO-1741: Python3: Fix error when codec is not in the header. http://git-wip-us.apache.org/repos/asf/avro/blob/673261c8/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java index f0f6e38..37c4613 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java @@ -395,14 +395,17 @@ public class DataFileWriter<D> implements Closeable, Flushable { private void writeBlock() throws IOException { if (blockCount > 0) { - bufOut.flush(); - ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer(); - DataBlock block = new DataBlock(uncompressed, blockCount); - block.setFlushOnWrite(flushOnEveryBlock); - block.compressUsing(codec); - block.writeBlockTo(vout, sync); - buffer.reset(); - blockCount = 0; + try { + bufOut.flush(); + ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer(); + DataBlock block = new DataBlock(uncompressed, blockCount); + block.setFlushOnWrite(flushOnEveryBlock); + block.compressUsing(codec); + block.writeBlockTo(vout, sync); + } finally { + buffer.reset(); + blockCount = 0; + } } } @@ -474,6 +477,17 @@ public class DataFileWriter<D> implements Closeable, Flushable { } public long tell() { return position+count; } + + @Override + public synchronized void flush() throws IOException { + try { + super.flush(); + } finally { + // Ensure that count is reset in any case to avoid writing garbage to the end of the file in case of an error + // occurred during the write + count = 0; + } + } } private static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream { http://git-wip-us.apache.org/repos/asf/avro/blob/673261c8/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java index f927f4d..c5403ea 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java @@ -90,8 +90,11 @@ public class BufferedBinaryEncoder extends BinaryEncoder { */ private void flushBuffer() throws IOException { if (pos > 0) { - sink.innerWrite(buf, 0, pos); - pos = 0; + try { + sink.innerWrite(buf, 0, pos); + } finally { + pos = 0; + } } } http://git-wip-us.apache.org/repos/asf/avro/blob/673261c8/lang/java/avro/src/test/java/org/apache/avro/file/TestIOExceptionDuringWrite.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/test/java/org/apache/avro/file/TestIOExceptionDuringWrite.java b/lang/java/avro/src/test/java/org/apache/avro/file/TestIOExceptionDuringWrite.java new file mode 100644 index 0000000..97914c4 --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/file/TestIOExceptionDuringWrite.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.file; + +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.avro.RandomData; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.junit.Test; + +/* + * Tests if we not write any garbage to the end of the file after any exception occurred + */ +public class TestIOExceptionDuringWrite { + + private static class FailingOutputStream extends OutputStream { + private int byteCnt; + + public FailingOutputStream(int failAfter) { + byteCnt = failAfter; + } + + @Override + public void write(int b) throws IOException { + if (byteCnt > 0) { + --byteCnt; + } else if (byteCnt == 0) { + --byteCnt; + throw new IOException("Artificial failure from FailingOutputStream"); + } else { + fail("No bytes should have been written after IOException"); + } + } + } + + private static final String SCHEMA_JSON = "{\"type\": \"record\", \"name\": \"Test\", \"fields\": [" + + "{\"name\":\"stringField\", \"type\":\"string\"}," + "{\"name\":\"longField\", \"type\":\"long\"}]}"; + private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON); + + @Test + public void testNoWritingAfterException() throws IOException { + DataFileWriter<Object> writer = new DataFileWriter<Object>(new GenericDatumWriter<Object>()); + try { + writer.create(SCHEMA, new FailingOutputStream(100000)); + int recordCnt = 0; + for (Object datum : new RandomData(SCHEMA, 100000, 42)) { + writer.append(datum); + if (++recordCnt % 17 == 0) { + writer.flush(); + } + } + } catch (IOException e) { + return; + } finally { + writer.close(); + } + fail("IOException should have been thrown"); + } + +}
