Repository: tajo
Updated Branches:
  refs/heads/master 145c17f74 -> b05ade6a6


TAJO-1903: Insert clause occassionally fails on S3.

Closes #798


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b05ade6a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b05ade6a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b05ade6a

Branch: refs/heads/master
Commit: b05ade6a68403cb4f075eb5c2cbe0c4f69f98cf5
Parents: 145c17f
Author: Jinho Kim <[email protected]>
Authored: Thu Oct 8 17:25:45 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Thu Oct 8 17:25:45 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../apache/tajo/storage/avro/AvroAppender.java  |  7 +---
 .../org/apache/tajo/storage/rcfile/RCFile.java  |  6 +--
 .../sequencefile/SequenceFileAppender.java      |  5 ---
 .../tajo/storage/text/DelimitedTextFile.java    | 17 +--------
 .../org/apache/tajo/storage/TestStorages.java   | 39 ++++++++++++++++++++
 6 files changed, 46 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b05ade6a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5870d67..295b5ca 100644
--- a/CHANGES
+++ b/CHANGES
@@ -333,6 +333,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1903: Insert clause occassionally fails on S3. (jinho)
+
     TAJO-1912: Selection from aliased schemaless tables throws an error. 
     (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b05ade6a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index ff0e8c0..e54fb80 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -36,7 +36,6 @@ import org.apache.tajo.storage.FileAppender;
 import org.apache.tajo.storage.TableStatistics;
 import org.apache.tajo.storage.Tuple;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -71,10 +70,8 @@ public class AvroAppender extends FileAppender {
    */
   public void init() throws IOException {
     FileSystem fs = path.getFileSystem(conf);
-    if (!fs.exists(path.getParent())) {
-      throw new FileNotFoundException(path.toString());
-    }
-    FSDataOutputStream outputStream = fs.create(path);
+
+    FSDataOutputStream outputStream = fs.create(path, false);
 
     avroSchema = AvroUtil.getAvroSchema(meta, conf);
     avroFields = avroSchema.getFields();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b05ade6a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 38a4761..ed55506 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -727,10 +727,6 @@ public class RCFile {
     public void init() throws IOException {
       fs = path.getFileSystem(conf);
 
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
       if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
         String codecClassname = 
this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
         try {
@@ -773,7 +769,7 @@ public class RCFile {
         columnBuffers[i] = new ColumnBuffer();
       }
 
-      init(conf, fs.create(path, true, 4096, (short) 3, 
fs.getDefaultBlockSize(), null), codec, metadata);
+      init(conf, fs.create(path, false, 4096, (short) 3, 
fs.getDefaultBlockSize(), null), codec, metadata);
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b05ade6a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index ad622fe..b1a14e3 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -40,7 +40,6 @@ import 
org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
 import org.apache.tajo.util.BytesUtils;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 
 public class SequenceFileAppender extends FileAppender {
@@ -95,10 +94,6 @@ public class SequenceFileAppender extends FileAppender {
       nullChars = nullCharacters.getBytes();
     }
 
-    if (!fs.exists(path.getParent())) {
-      throw new FileNotFoundException(path.toString());
-    }
-
     if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
       String codecName = 
this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
       codecFactory = new CompressionCodecFactory(conf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b05ade6a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index b5155d4..c0ee784 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -39,7 +39,6 @@ import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
 import org.apache.tajo.unit.StorageUnit;
@@ -47,7 +46,6 @@ import org.apache.tajo.util.ReflectionUtil;
 
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -133,10 +131,6 @@ public class DelimitedTextFile {
 
     @Override
     public void init() throws IOException {
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
       if (enabledStats) {
         this.stats = new TableStatistics(this.schema);
       }
@@ -163,19 +157,12 @@ public class DelimitedTextFile {
         String extension = codec.getDefaultExtension();
         compressedPath = path.suffix(extension);
 
-        if (fs.exists(compressedPath)) {
-          throw new AlreadyExistsStorageException(compressedPath);
-        }
-
-        fos = fs.create(compressedPath);
+        fos = fs.create(compressedPath, false);
         deflateFilter = codec.createOutputStream(fos, compressor);
         outputStream = new DataOutputStream(deflateFilter);
 
       } else {
-        if (fs.exists(path)) {
-          throw new AlreadyExistsStorageException(path);
-        }
-        fos = fs.create(path);
+        fos = fs.create(path, false);
         outputStream = new DataOutputStream(new BufferedOutputStream(fos));
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b05ade6a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index dafaf05..1f6168a 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.tajo.BuiltinStorages;
@@ -57,6 +58,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TestStorages {
@@ -1154,4 +1156,41 @@ public class TestStorages {
       OldStorageManager.clearCache();
     }
   }
+
+  @Test
+  public void testFileAlreadyExists() throws IOException {
+
+    if (internalType) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("score", Type.FLOAT4);
+
+    TableMeta meta = CatalogUtil.newTableMeta(dataFormat);
+    meta.setOptions(CatalogUtil.newDefaultProperty(dataFormat));
+    if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
+          TEST_PROJECTION_AVRO_SCHEMA);
+    }
+
+    FileTablespace sm = TablespaceManager.getLocalFs();
+    Path tablePath = new Path(testDir, "testFileAlreadyExists.data");
+
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.init();
+    appender.close();
+
+    try {
+      appender = sm.getAppender(meta, schema, tablePath);
+      appender.init();
+      if (BuiltinStorages.ORC.equals(dataFormat)) {
+        appender.close();
+      }
+      fail(dataFormat);
+    } catch (IOException e) {
+    } finally {
+      IOUtils.cleanup(null, appender);
+    }
+  }
 }
\ No newline at end of file

Reply via email to