Repository: incubator-gobblin Updated Branches: refs/heads/master 55a19bbfe -> 60d4e61d0
[GOBBLIN-671] Close the underlying writer when a HiveWritableHdfsData⦠Closes #2541 from htran1/hive_writer_close Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/60d4e61d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/60d4e61d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/60d4e61d Branch: refs/heads/master Commit: 60d4e61d09664e4ea2af511a3a53f6cd81d1963a Parents: 55a19bb Author: Hung Tran <[email protected]> Authored: Fri Jan 25 11:30:55 2019 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Jan 25 11:30:55 2019 -0800 ---------------------------------------------------------------------- .../org/apache/gobblin/writer/FsDataWriter.java | 29 ++++--- .../writer/HiveWritableHdfsDataWriter.java | 20 ++++- .../writer/HiveWritableHdfsDataWriterTest.java | 89 ++++++++++++++++++++ .../resources/writer/hive_writer.properties | 21 +++++ 4 files changed, 144 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java index d814622..dadb51f 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java @@ -261,6 +261,21 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta // For the same reason as deleting the staging file if it already exists, overwrite // the output file if it already exists to prevent task retry from being blocked. HadoopUtils.renamePath(this.fileContext, this.stagingFile, this.outputFile, true); + + // The staging file is moved to the output path in commit, so rename to add record count after that + if (this.shouldIncludeRecordCountInFileName) { + String filePathWithRecordCount = addRecordCountToFileName(); + this.properties.appendToSetProp(this.allOutputFilesPropName, filePathWithRecordCount); + } else { + this.properties.appendToSetProp(this.allOutputFilesPropName, getOutputFilePath()); + } + + FsWriterMetrics metrics = new FsWriterMetrics( + this.id, + new PartitionIdentifier(this.partitionKey, this.branchId), + ImmutableSet.of(new FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten())) + ); + this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson()); } /** @@ -285,20 +300,6 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta public void close() throws IOException { this.closer.close(); - - if (this.shouldIncludeRecordCountInFileName) { - String filePathWithRecordCount = addRecordCountToFileName(); - this.properties.appendToSetProp(this.allOutputFilesPropName, filePathWithRecordCount); - } else { - this.properties.appendToSetProp(this.allOutputFilesPropName, getOutputFilePath()); - } - - FsWriterMetrics metrics = new FsWriterMetrics( - this.id, - new PartitionIdentifier(this.partitionKey, this.branchId), - ImmutableSet.of(new FsWriterMetrics.FileInfo(this.outputFile.getName(), recordsWritten())) - ); - this.properties.setProp(FS_WRITER_METRICS_KEY, metrics.toJson()); } private synchronized String addRecordCountToFileName() http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java index b1a3046..1145920 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java @@ -44,6 +44,8 @@ public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> { protected final RecordWriter writer; protected final AtomicLong count = new AtomicLong(0); + // the close method may be invoked multiple times, but the underlying writer only supports close being called once + private boolean closed = false; public HiveWritableHdfsDataWriter(HiveWritableHdfsDataWriterBuilder<?> builder, State properties) throws IOException { super(builder, properties); @@ -92,8 +94,24 @@ public class HiveWritableHdfsDataWriter extends FsDataWriter<Writable> { } @Override + public void close() throws IOException { + // close the underlying writer if not already closed. The close can only be called once for the underlying writer, + // so remember the state + if (!this.closed) { + this.writer.close(false); + this.closed = true; + } + + super.close(); + } + + @Override public void commit() throws IOException { - this.writer.close(false); + if (!this.closed) { + this.writer.close(false); + this.closed = true; + } + super.commit(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java new file mode 100644 index 0000000..5fb5ee4 --- /dev/null +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; + +public class HiveWritableHdfsDataWriterTest { + private FileSystem fs; + private File tmpDir; + + @BeforeClass + public void setUp() throws IOException { + tmpDir = Files.createTempDir(); + this.fs = FileSystem.get(new Configuration()); + } + + @AfterClass + public void cleanUp() throws IOException { + if (this.fs.exists(new Path(this.tmpDir.getAbsolutePath()))) { + if (!this.fs.delete(new Path(this.tmpDir.getAbsolutePath()), true)) { + throw new IOException("Failed to clean up path " + this.tmpDir); + } + } + } + + /** + * Test that multiple close calls do not raise an error + */ + @Test + public void testMultipleClose() throws IOException { + Properties properties = new Properties(); + properties.load(new FileReader("gobblin-core/src/test/resources/writer/hive_writer.properties")); + + properties.setProperty("writer.staging.dir", new Path(tmpDir.getAbsolutePath(), "output-staging").toString()); + properties.setProperty("writer.output.dir", new Path(tmpDir.getAbsolutePath(), "output").toString()); + properties.setProperty("writer.file.path", "."); + + SourceState sourceState = new SourceState(new State(properties), ImmutableList.<WorkUnitState> of()); + + DataWriter writer = new HiveWritableHdfsDataWriterBuilder<>().withBranches(1) + .withWriterId("0").writeTo(Destination.of(Destination.DestinationType.HDFS, sourceState)) + .writeInFormat(WriterOutputFormat.ORC).build(); + + writer.close(); + // check for file existence + Assert.assertTrue(this.fs.exists(new Path(new Path(tmpDir.getAbsolutePath(), "output-staging"), "writer-output.orc")), + "staging file not found"); + + // closed again is okay + writer.close(); + // commit after close is okay + writer.commit(); + Assert.assertTrue(this.fs.exists(new Path(new Path(tmpDir.getAbsolutePath(), "output"), "writer-output.orc")), + "output file not found"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/60d4e61d/gobblin-core/src/test/resources/writer/hive_writer.properties ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/writer/hive_writer.properties b/gobblin-core/src/test/resources/writer/hive_writer.properties new file mode 100644 index 0000000..38edf4a --- /dev/null +++ b/gobblin-core/src/test/resources/writer/hive_writer.properties @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +serde.serializer.type=ORC +serde.deserializer.type=AVRO +writer.file.name=writer-output.orc +
