This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 49e8627  PARQUET-1599: Fix to-avro to respect the overwrite option 
(#650)
49e8627 is described below

commit 49e862704e7da7149493a953ccc18515d1eada88
Author: Kengo Seki <[email protected]>
AuthorDate: Sat Apr 11 16:01:27 2020 +0900

    PARQUET-1599: Fix to-avro to respect the overwrite option (#650)
    
    * PARQUET-1599: Fix to-avro to respect the overwrite option
    
    * Address the same problem on SchemaCommand
    
    * Remove unused variables
    
    * Consolidate redundant try clauses
---
 .../java/org/apache/parquet/cli/BaseCommand.java   | 24 ++++++++++++++++-
 .../apache/parquet/cli/commands/SchemaCommand.java | 11 ++------
 .../apache/parquet/cli/commands/ToAvroCommand.java | 15 +++--------
 .../apache/parquet/cli/commands/AvroFileTest.java  | 23 ++++++++++++-----
 .../parquet/cli/commands/SchemaCommandTest.java    | 30 ++++++++++++++++++++++
 .../parquet/cli/commands/ToAvroCommandTest.java    | 19 ++++++++++++++
 6 files changed, 94 insertions(+), 28 deletions(-)

diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index cdef53d..c994a32 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -140,15 +140,37 @@ public abstract class BaseCommand implements Command, 
Configurable {
     return create(filename, false);
   }
 
+  /**
+   * Creates a file and returns an open {@link FSDataOutputStream}.
+   *
+   * If the file does not have a file system scheme, this uses the default FS.
+   *
+   * This will neither produce checksum files nor overwrite a file that already
+   * exists.
+   *
+   * @param filename The filename to create
+   * @return An open FSDataOutputStream
+   * @throws IOException if there is an error creating the file
+   */
+  public FSDataOutputStream createWithNoOverwrite(String filename)
+    throws IOException {
+    return create(filename, true, false);
+  }
+
   private FSDataOutputStream create(String filename, boolean noChecksum)
       throws IOException {
+    return create(filename, noChecksum, true);
+  }
+
+  private FSDataOutputStream create(String filename, boolean noChecksum, 
boolean overwrite)
+    throws IOException {
     Path filePath = qualifiedPath(filename);
     // even though it was qualified using the default FS, it may not be in it
     FileSystem fs = filePath.getFileSystem(getConf());
     if (noChecksum && fs instanceof ChecksumFileSystem) {
       fs = ((ChecksumFileSystem) fs).getRawFileSystem();
     }
-    return fs.create(filePath, true /* overwrite */);
+    return fs.create(filePath, overwrite);
   }
 
   /**
diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
index ea2306f..ca29dd0 100644
--- 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
+++ 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Lists;
 import org.apache.parquet.cli.BaseCommand;
 import org.apache.parquet.cli.util.Formats;
 import org.apache.avro.file.SeekableInput;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -80,14 +79,8 @@ public class SchemaCommand extends BaseCommand {
       String source = targets.get(0);
 
       if (outputPath != null) {
-        Path outPath = qualifiedPath(outputPath);
-        FileSystem outFS = outPath.getFileSystem(getConf());
-        if (overwrite && outFS.exists(outPath)) {
-          console.debug("Deleting output file {} (already exists)", outPath);
-          outFS.delete(outPath);
-        }
-
-        try (OutputStream out = create(outputPath)) {
+        try (OutputStream out = overwrite ?
+          create(outputPath) : createWithNoOverwrite(outputPath)) {
           out.write(getSchema(source).getBytes(StandardCharsets.UTF_8));
         }
       } else {
diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
index d659109..2e04d74 100644
--- 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
+++ 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ToAvroCommand.java
@@ -29,7 +29,6 @@ import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.DatumWriter;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.cli.BaseCommand;
 import org.apache.parquet.cli.util.Codecs;
@@ -37,6 +36,7 @@ import org.apache.parquet.cli.util.Schemas;
 import org.slf4j.Logger;
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
 
 import static org.apache.avro.generic.GenericData.Record;
@@ -92,16 +92,8 @@ public class ToAvroCommand extends BaseCommand {
     } else {
       schema = getAvroSchema(source);
     }
-    final Schema projection = filterSchema(schema, columns);
-
-    Path outPath = qualifiedPath(outputPath);
-    try (FileSystem outFS = outPath.getFileSystem(getConf())) {
-      if (overwrite && outFS.exists(outPath)) {
-        console.debug("Deleting output file {} (already exists)", outPath);
-        outFS.delete(outPath);
-      }
-    }
 
+    final Schema projection = filterSchema(schema, columns);
     Iterable<Record> reader = openDataFile(source, projection);
     boolean threw = true;
     long count = 0;
@@ -109,7 +101,8 @@ public class ToAvroCommand extends BaseCommand {
     DatumWriter<Record> datumWriter = new GenericDatumWriter<>(schema);
     try (DataFileWriter<Record> fileWriter = new 
DataFileWriter<>(datumWriter)) {
       fileWriter.setCodec(codecFactory);
-      try (DataFileWriter<Record> writer=fileWriter.create(projection, 
create(outputPath))) {
+      try (OutputStream os = overwrite ? create(outputPath) : 
createWithNoOverwrite(outputPath);
+           DataFileWriter<Record> writer = fileWriter.create(projection, os)) {
         for (Record record : reader) {
           writer.append(record);
           count += 1;
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
index 40c05c7..1aa6f98 100644
--- 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/AvroFileTest.java
@@ -26,19 +26,28 @@ import java.util.Arrays;
 
 public class AvroFileTest extends ParquetFileTest {
 
-  protected File toAvro(File parquetFile) throws IOException {
-    return toAvro(parquetFile, "GZIP");
+  protected File toAvro(File inputFile) throws IOException {
+    return toAvro(inputFile, "GZIP");
   }
 
-  protected File toAvro(File parquetFile, String compressionCodecName) throws 
IOException {
+  protected File toAvro(File inputFile, String compressionCodecName) throws 
IOException {
+    File outputFile = new File(getTempFolder(), getClass().getSimpleName() + 
".avro");
+    return toAvro(inputFile, outputFile, false, compressionCodecName);
+  }
+
+  protected File toAvro(File inputFile, File outputFile, boolean overwrite) 
throws IOException {
+    return toAvro(inputFile, outputFile, overwrite, "GZIP");
+  }
+
+  protected File toAvro(File inputFile, File outputFile, boolean overwrite, 
String compressionCodecName) throws IOException {
     ToAvroCommand command = new ToAvroCommand(createLogger());
-    command.targets = Arrays.asList(parquetFile.getAbsolutePath());
-    File output = new File(getTempFolder(), getClass().getSimpleName() + 
".avro");
-    command.outputPath = output.getAbsolutePath();
+    command.targets = Arrays.asList(inputFile.getAbsolutePath());
+    command.outputPath = outputFile.getAbsolutePath();
     command.compressionCodecName = compressionCodecName;
+    command.overwrite = overwrite;
     command.setConf(new Configuration());
     int exitCode = command.run();
     assert(exitCode == 0);
-    return output;
+    return outputFile;
   }
 }
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
index ad03aac..36e122b 100644
--- 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/SchemaCommandTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.parquet.cli.commands;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,4 +37,32 @@ public class SchemaCommandTest extends ParquetFileTest {
     command.setConf(new Configuration());
     Assert.assertEquals(0, command.run());
   }
+
+  @Test
+  public void testSchemaCommandOverwriteExistentFile() throws IOException {
+    File inputFile = parquetFile();
+    File outputFile = new File(getTempFolder(), getClass().getSimpleName() + 
".avsc");
+    FileUtils.touch(outputFile);
+    Assert.assertEquals(0, outputFile.length());
+    SchemaCommand command = new SchemaCommand(createLogger());
+    command.targets = Arrays.asList(inputFile.getAbsolutePath());
+    command.outputPath = outputFile.getAbsolutePath();
+    command.overwrite = true;
+    command.setConf(new Configuration());
+    Assert.assertEquals(0, command.run());
+    Assert.assertTrue(0 < outputFile.length());
+  }
+
+
+  @Test(expected = FileAlreadyExistsException.class)
+  public void testSchemaCommandOverwriteExistentFileWithoutOverwriteOption() 
throws IOException {
+    File inputFile = parquetFile();
+    File outputFile = new File(getTempFolder(), getClass().getSimpleName() + 
".avsc");
+    FileUtils.touch(outputFile);
+    SchemaCommand command = new SchemaCommand(createLogger());
+    command.targets = Arrays.asList(inputFile.getAbsolutePath());
+    command.outputPath = outputFile.getAbsolutePath();
+    command.setConf(new Configuration());
+    command.run();
+  }
 }
diff --git 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
index 9344a78..8f5490e 100644
--- 
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
+++ 
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ToAvroCommandTest.java
@@ -20,6 +20,8 @@
 package org.apache.parquet.cli.commands;
 
 import com.beust.jcommander.JCommander;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -62,6 +64,7 @@ public class ToAvroCommandTest extends AvroFileTest {
       .addObject(cmd)
       .build()
       .parse(
+        "--overwrite",
         jsonInputFile.getAbsolutePath(),
         "--output",
         avroOutputFile.getAbsolutePath()
@@ -91,4 +94,20 @@ public class ToAvroCommandTest extends AvroFileTest {
   public void testToAvroCommandWithInvalidCompression() throws IOException {
     toAvro(parquetFile(), "FOO");
   }
+
+  @Test
+  public void testToAvroCommandOverwriteExistentFile() throws IOException {
+    File outputFile = new File(getTempFolder(), getClass().getSimpleName() + 
".avro");
+    FileUtils.touch(outputFile);
+    Assert.assertEquals(0, outputFile.length());
+    File avroFile = toAvro(parquetFile(), outputFile, true);
+    Assert.assertTrue(0 < avroFile.length());
+  }
+
+  @Test(expected = FileAlreadyExistsException.class)
+  public void testToAvroCommandOverwriteExistentFileWithoutOverwriteOption() 
throws IOException {
+    File outputFile = new File(getTempFolder(), getClass().getSimpleName() + 
".avro");
+    FileUtils.touch(outputFile);
+    toAvro(parquetFile(), outputFile, false);
+  }
 }

Reply via email to