This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 49e8627 PARQUET-1599: Fix to-avro to respect the overwrite option
(#650)
49e8627 is described below
commit 49e862704e7da7149493a953ccc18515d1eada88
Author: Kengo Seki <[email protected]>
AuthorDate: Sat Apr 11 16:01:27 2020 +0900
PARQUET-1599: Fix to-avro to respect the overwrite option (#650)
* PARQUET-1599: Fix to-avro to respect the overwrite option
* Address the same problem on SchemaCommand
* Remove unused variables
* Consolidate redundant try clauses
---
.../java/org/apache/parquet/cli/BaseCommand.java | 24 ++++++++++++++++-
.../apache/parquet/cli/commands/SchemaCommand.java | 11 ++------
.../apache/parquet/cli/commands/ToAvroCommand.java | 15 +++--------
.../apache/parquet/cli/commands/AvroFileTest.java | 23 ++++++++++++-----
.../parquet/cli/commands/SchemaCommandTest.java | 30 ++++++++++++++++++++++
.../parquet/cli/commands/ToAvroCommandTest.java | 19 ++++++++++++++
6 files changed, 94 insertions(+), 28 deletions(-)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index cdef53d..c994a32 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -140,15 +140,37 @@ public abstract class BaseCommand implements Command,
Configurable {
return create(filename, false);
}
+ /**
+ * Creates a file and returns an open {@link FSDataOutputStream}.
+ *
+ * If the file does not have a file system scheme, this uses the default FS.
+ *
+ * This will neither produce checksum files nor overwrite a file that already
+ * exists.
+ *
+ * @param filename The filename to create
+ * @return An open FSDataOutputStream
+ * @throws IOException if there is an error creating the file
+ */
+ public FSDataOutputStream createWithNoOverwrite(String filename)
+ throws IOException {
+ return create(filename, true, false);
+ }
+
private FSDataOutputStream create(String filename, boolean noChecksum)
throws IOException {
+ return create(filename, noChecksum, true);
+ }
+
+ private FSDataOutputStream create(String filename, boolean noChecksum,
boolean overwrite)
+ throws IOException {
Path filePath = qualifiedPath(filename);
// even though it was qualified using the default FS, it may not be in it
FileSystem fs = filePath.getFileSystem(getConf());
if (noChecksum && fs instanceof ChecksumFileSystem) {
fs = ((ChecksumFileSystem) fs).getRawFileSystem();
}
- return fs.create(filePath, true /* overwrite */);
+ return fs.create(filePath, overwrite);
}
/**
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
index ea2306f..ca29dd0 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Lists;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.util.Formats;
import org.apache.avro.file.SeekableInput;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -80,14 +79,8 @@ public class SchemaCommand extends BaseCommand {
String source = targets.get(0);
if (outputPath != null) {
- Path outPath = qualifiedPath(outputPath);
- FileSystem outFS = outPath.getFileSystem(getConf());
- if (overwrite && outFS.exists(outPath)) {
- console.debug("Deleting output file {} (already exists)", outPath);
- outFS.delete(outPath);
- }
-
- try (OutputStream out = create(outputPath)) {
+ try (OutputStream out = overwrite ?
+ create(outputPath) : createWithNoOverwrite(outputPath)) {
out.write(getSchema(source).getBytes(StandardCharsets.UTF_8));
}
} else {
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
index d659109..2e04d74 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
@@ -29,7 +29,6 @@ import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.util.Codecs;
@@ -37,6 +36,7 @@ import org.apache.parquet.cli.util.Schemas;
import org.slf4j.Logger;
import java.io.Closeable;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.List;
import static org.apache.avro.generic.GenericData.Record;
@@ -92,16 +92,8 @@ public class ToAvroCommand extends BaseCommand {
} else {
schema = getAvroSchema(source);
}
- final Schema projection = filterSchema(schema, columns);
-
- Path outPath = qualifiedPath(outputPath);
- try (FileSystem outFS = outPath.getFileSystem(getConf())) {
- if (overwrite && outFS.exists(outPath)) {
- console.debug("Deleting output file {} (already exists)", outPath);
- outFS.delete(outPath);
- }
- }
+ final Schema projection = filterSchema(schema, columns);
Iterable<Record> reader = openDataFile(source, projection);
boolean threw = true;
long count = 0;
@@ -109,7 +101,8 @@ public class ToAvroCommand extends BaseCommand {
DatumWriter<Record> datumWriter = new GenericDatumWriter<>(schema);
try (DataFileWriter<Record> fileWriter = new
DataFileWriter<>(datumWriter)) {
fileWriter.setCodec(codecFactory);
- try (DataFileWriter<Record> writer=fileWriter.create(projection,
create(outputPath))) {
+ try (OutputStream os = overwrite ? create(outputPath) :
createWithNoOverwrite(outputPath);
+ DataFileWriter<Record> writer = fileWriter.create(projection, os)) {
for (Record record : reader) {
writer.append(record);
count += 1;
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
index 40c05c7..1aa6f98 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
@@ -26,19 +26,28 @@ import java.util.Arrays;
public class AvroFileTest extends ParquetFileTest {
- protected File toAvro(File parquetFile) throws IOException {
- return toAvro(parquetFile, "GZIP");
+ protected File toAvro(File inputFile) throws IOException {
+ return toAvro(inputFile, "GZIP");
}
- protected File toAvro(File parquetFile, String compressionCodecName) throws
IOException {
+ protected File toAvro(File inputFile, String compressionCodecName) throws
IOException {
+ File outputFile = new File(getTempFolder(), getClass().getSimpleName() +
".avro");
+ return toAvro(inputFile, outputFile, false, compressionCodecName);
+ }
+
+ protected File toAvro(File inputFile, File outputFile, boolean overwrite)
throws IOException {
+ return toAvro(inputFile, outputFile, overwrite, "GZIP");
+ }
+
+ protected File toAvro(File inputFile, File outputFile, boolean overwrite,
String compressionCodecName) throws IOException {
ToAvroCommand command = new ToAvroCommand(createLogger());
- command.targets = Arrays.asList(parquetFile.getAbsolutePath());
- File output = new File(getTempFolder(), getClass().getSimpleName() +
".avro");
- command.outputPath = output.getAbsolutePath();
+ command.targets = Arrays.asList(inputFile.getAbsolutePath());
+ command.outputPath = outputFile.getAbsolutePath();
command.compressionCodecName = compressionCodecName;
+ command.overwrite = overwrite;
command.setConf(new Configuration());
int exitCode = command.run();
assert(exitCode == 0);
- return output;
+ return outputFile;
}
}
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
index ad03aac..36e122b 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.parquet.cli.commands;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.junit.Assert;
import org.junit.Test;
@@ -35,4 +37,32 @@ public class SchemaCommandTest extends ParquetFileTest {
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}
+
+ @Test
+ public void testSchemaCommandOverwriteExistentFile() throws IOException {
+ File inputFile = parquetFile();
+ File outputFile = new File(getTempFolder(), getClass().getSimpleName() +
".avsc");
+ FileUtils.touch(outputFile);
+ Assert.assertEquals(0, outputFile.length());
+ SchemaCommand command = new SchemaCommand(createLogger());
+ command.targets = Arrays.asList(inputFile.getAbsolutePath());
+ command.outputPath = outputFile.getAbsolutePath();
+ command.overwrite = true;
+ command.setConf(new Configuration());
+ Assert.assertEquals(0, command.run());
+ Assert.assertTrue(0 < outputFile.length());
+ }
+
+
+ @Test(expected = FileAlreadyExistsException.class)
+ public void testSchemaCommandOverwriteExistentFileWithoutOverwriteOption()
throws IOException {
+ File inputFile = parquetFile();
+ File outputFile = new File(getTempFolder(), getClass().getSimpleName() +
".avsc");
+ FileUtils.touch(outputFile);
+ SchemaCommand command = new SchemaCommand(createLogger());
+ command.targets = Arrays.asList(inputFile.getAbsolutePath());
+ command.outputPath = outputFile.getAbsolutePath();
+ command.setConf(new Configuration());
+ command.run();
+ }
}
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
index 9344a78..8f5490e 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
@@ -20,6 +20,8 @@
package org.apache.parquet.cli.commands;
import com.beust.jcommander.JCommander;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -62,6 +64,7 @@ public class ToAvroCommandTest extends AvroFileTest {
.addObject(cmd)
.build()
.parse(
+ "--overwrite",
jsonInputFile.getAbsolutePath(),
"--output",
avroOutputFile.getAbsolutePath()
@@ -91,4 +94,20 @@ public class ToAvroCommandTest extends AvroFileTest {
public void testToAvroCommandWithInvalidCompression() throws IOException {
toAvro(parquetFile(), "FOO");
}
+
+ @Test
+ public void testToAvroCommandOverwriteExistentFile() throws IOException {
+ File outputFile = new File(getTempFolder(), getClass().getSimpleName() +
".avro");
+ FileUtils.touch(outputFile);
+ Assert.assertEquals(0, outputFile.length());
+ File avroFile = toAvro(parquetFile(), outputFile, true);
+ Assert.assertTrue(0 < avroFile.length());
+ }
+
+ @Test(expected = FileAlreadyExistsException.class)
+ public void testToAvroCommandOverwriteExistentFileWithoutOverwriteOption()
throws IOException {
+ File outputFile = new File(getTempFolder(), getClass().getSimpleName() +
".avro");
+ FileUtils.touch(outputFile);
+ toAvro(parquetFile(), outputFile, false);
+ }
}