Author: daijy
Date: Tue May 12 07:01:05 2015
New Revision: 1678878

URL: http://svn.apache.org/r1678878
Log:
PIG-4496: Fix CBZip2InputStream to close underlying stream

Added:
    pig/trunk/test/org/apache/pig/test/utils/CloseAwareFSDataInputStream.java
    pig/trunk/test/org/apache/pig/test/utils/CloseAwareOutputStream.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
    pig/trunk/test/org/apache/pig/test/TestBZip.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1678878&r1=1678877&r2=1678878&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May 12 07:01:05 2015
@@ -76,6 +76,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4496: Fix CBZip2InputStream to close underlying stream (petersla via daijy)
+
 PIG-4528: Fix a typo in src/docs/src/documentation/content/xdocs/basic.xml 
(namusyaka via daijy)
 
 PIG-4532: Pig Documentation contains typo for AvroStorage 
(fredericschmaljohann via daijy)

Modified: pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=1678878&r1=1678877&r2=1678878&view=diff
==============================================================================
--- pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java 
(original)
+++ pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Tue 
May 12 07:01:05 2015
@@ -223,6 +223,10 @@ public class CBZip2InputStream extends I
 
     @Override
     public int read() throws IOException {
+        if (this.innerBsStream == null) {
+            throw new IOException("stream closed");
+        }
+
         if (streamEnd) {
             return -1;
         } else {
@@ -264,6 +268,18 @@ public class CBZip2InputStream extends I
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        if (this.innerBsStream == null) {
+            return;
+        }
+        try {
+            innerBsStream.close();
+        } finally {
+            this.innerBsStream = null;
+        }
+    }
+
     /**
      * getPos is used by the caller to know when the processing of the current 
      * {@link InputSplit} is complete. In this method, as we read each bzip
@@ -291,7 +307,6 @@ public class CBZip2InputStream extends I
             magic4 = bsGetUChar();
             if (magic1 != 'B' || magic2 != 'Z' || 
                     magic3 != 'h' || magic4 < '1' || magic4 > '9') {
-                bsFinishedWithStream();
                 streamEnd = true;
                 return;
             }
@@ -308,7 +323,6 @@ public class CBZip2InputStream extends I
     
     private void initBlock(boolean searchForMagic) throws IOException {
         if (readCount >= readLimit) {
-            bsFinishedWithStream();
             streamEnd = true;
             return;
         }
@@ -408,7 +422,6 @@ public class CBZip2InputStream extends I
                throw new IOException("Encountered additional bytes in the 
filesplit past the crc block. "
                                + "Loading of concatenated bz2 files is not 
supported");
         }
-        bsFinishedWithStream();
         streamEnd = true;
     }
 
@@ -424,14 +437,6 @@ public class CBZip2InputStream extends I
         cadvise("CRC error");
     }
 
-    private void bsFinishedWithStream() {
-        if (this.innerBsStream != null) {
-            if (this.innerBsStream != System.in) {
-                this.innerBsStream = null;
-            }
-        }
-    }
-
     private void bsSetStream(FSDataInputStream f) {
         innerBsStream = f;
         bsLive = 0;

Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1678878&r1=1678877&r2=1678878&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Tue May 12 07:01:05 2015
@@ -45,16 +45,23 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.utils.CloseAwareFSDataInputStream;
+import org.apache.pig.test.utils.CloseAwareOutputStream;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class TestBZip {
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         cluster = MiniGenericCluster.buildCluster();
@@ -73,10 +80,9 @@ public class TestBZip {
     public void testBzipInPig() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".bz2");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.bz2");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
 
@@ -121,9 +127,6 @@ public class TestBZip {
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-
-        in.delete();
-        Util.deleteFile(cluster, clusterOutput);
     }
 
    /**
@@ -133,10 +136,9 @@ public class TestBZip {
     public void testBzipInPig2() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".bz2");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.bz2");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
 
@@ -181,9 +183,6 @@ public class TestBZip {
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-
-        in.delete();
-        out.delete();
     }
 
     //see PIG-2391
@@ -197,10 +196,9 @@ public class TestBZip {
         };
 
         // bzip compressed input
-        File in = File.createTempFile("junit", ".bz2");
+        File in = folder.newFile("junit-in.bz2");
         String compressedInputFileName = in.getAbsolutePath();
         String clusterCompressedFilePath = 
Util.removeColon(compressedInputFileName);
-        in.deleteOnExit();
 
         try {
             CBZip2OutputStream cos =
@@ -230,7 +228,6 @@ public class TestBZip {
                 it2.next();
             }
         } finally {
-            in.delete();
             Util.deleteFile(cluster, "intermediate.bz");
             Util.deleteFile(cluster, "final.bz");
         }
@@ -249,9 +246,8 @@ public class TestBZip {
         };
 
         // bzip compressed input
-        File in = File.createTempFile("junit", ".bz2");
+        File in = folder.newFile("junit-in.bz2");
         String compressedInputFileName = in.getAbsolutePath();
-        in.deleteOnExit();
         String clusterCompressedFilePath = 
Util.removeColon(compressedInputFileName);
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
@@ -291,7 +287,6 @@ public class TestBZip {
             assertFalse(it2.hasNext());
 
         } finally {
-            in.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
             Util.deleteFile(cluster, clusterCompressedFilePath);
         }
@@ -305,10 +300,9 @@ public class TestBZip {
      public void testEmptyBzipInPig() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".tmp");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.tmp");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());
 
@@ -336,10 +330,6 @@ public class TestBZip {
 
         pig.registerQuery("B = load '" + 
Util.encodeEscape(clusterOutputFilePath) + "';");
         pig.openIterator("B");
-
-        in.delete();
-        Util.deleteFile(cluster, clusterOutputFilePath);
-
     }
 
     /**
@@ -347,8 +337,7 @@ public class TestBZip {
      */
     @Test
     public void testEmptyBzip() throws Exception {
-        File tmp = File.createTempFile("junit", ".tmp");
-        tmp.deleteOnExit();
+        File tmp = folder.newFile("junit.tmp");
         CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
                 tmp));
         cos.close();
@@ -358,7 +347,25 @@ public class TestBZip {
                 fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length());
         assertEquals(-1, cis.read(new byte[100]));
         cis.close();
-        tmp.delete();
+    }
+
+    @Test
+    public void testInnerStreamGetsClosed() throws Exception {
+        File tmp = folder.newFile("junit.tmp");
+
+        CloseAwareOutputStream out = new CloseAwareOutputStream(new 
FileOutputStream(tmp));
+        CBZip2OutputStream cos = new CBZip2OutputStream(out);
+        assertFalse(out.isClosed());
+        cos.close();
+        assertTrue(out.isClosed());
+
+        FileSystem fs = FileSystem.getLocal(new Configuration(false));
+        Path path = new Path(tmp.getAbsolutePath());
+        CloseAwareFSDataInputStream in = new 
CloseAwareFSDataInputStream(fs.open(path));
+        CBZip2InputStream cis = new CBZip2InputStream(in, -1, tmp.length());
+        assertFalse(in.isClosed());
+        cis.close();
+        assertTrue(in.isClosed());
     }
 
     /**
@@ -556,14 +563,12 @@ public class TestBZip {
         };
 
         // bzip compressed input file1
-        File in1 = File.createTempFile("junit", ".bz2");
+        File in1 = folder.newFile("junit-in1.bz2");
         String compressedInputFileName1 = in1.getAbsolutePath();
-        in1.deleteOnExit();
 
         // file2
-        File in2 = File.createTempFile("junit", ".bz2");
+        File in2 = folder.newFile("junit-in2.bz2");
         String compressedInputFileName2 = in2.getAbsolutePath();
-        in1.deleteOnExit();
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
         Util.createInputFile(cluster, unCompressedInputFileName, 
inputDataMerged);
@@ -614,8 +619,6 @@ public class TestBZip {
             assertFalse(it2.hasNext());
 
         } finally {
-            in1.delete();
-            in2.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
         }
 

Added: pig/trunk/test/org/apache/pig/test/utils/CloseAwareFSDataInputStream.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/CloseAwareFSDataInputStream.java?rev=1678878&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/CloseAwareFSDataInputStream.java 
(added)
+++ pig/trunk/test/org/apache/pig/test/utils/CloseAwareFSDataInputStream.java 
Tue May 12 07:01:05 2015
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class CloseAwareFSDataInputStream extends FSDataInputStream {
+
+  private boolean isClosed;
+
+  public CloseAwareFSDataInputStream(InputStream in) throws IOException {
+    super(in);
+    isClosed = false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    isClosed = true;
+  }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+}

Added: pig/trunk/test/org/apache/pig/test/utils/CloseAwareOutputStream.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/CloseAwareOutputStream.java?rev=1678878&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/CloseAwareOutputStream.java (added)
+++ pig/trunk/test/org/apache/pig/test/utils/CloseAwareOutputStream.java Tue 
May 12 07:01:05 2015
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class CloseAwareOutputStream extends OutputStream {
+
+  private final OutputStream out;
+  private boolean isClosed;
+
+  public CloseAwareOutputStream(OutputStream out) {
+      this.out = out;
+      this.isClosed = false;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+      out.write(b);
+  }
+
+  @Override
+  public void close() throws IOException {
+      out.close();
+      isClosed = true;
+  }
+
+  public boolean isClosed() {
+      return isClosed;
+  }
+}


Reply via email to