Author: szita
Date: Sat Jul 22 11:38:47 2017
New Revision: 1802676

URL: http://svn.apache.org/viewvc?rev=1802676&view=rev
Log:
PIG-3655: BinStorage and InterStorage approach to record markers is broken 
(szita)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
    pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
    pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
    pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
    pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
    pig/trunk/test/org/apache/pig/test/TestFRJoin2.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Jul 22 11:38:47 2017
@@ -38,6 +38,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3655: BinStorage and InterStorage approach to record markers is broken 
(szita)
+
 PIG-5274: TestEvalPipelineLocal#testSetLocationCalledInFE is failing in spark 
mode after PIG-5157 (nkollar via szita)
 
 PIG-4767: Partition filter not pushed down when filter clause references 
variable from another load path (knoguchi)

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Sat Jul 22 11:38:47 2017
@@ -40,6 +40,24 @@ public class PigConfiguration {
      */
     public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = 
"pig.auto.local.input.maxbytes";
 
+
+    /**
+     * Sets the length of record markers in binary files produces by Pig 
between jobs
+     * The longer the byte sequence means less chance of collision with actual 
data,
+     * shorter sequence means less overhead
+     */
+    public static final String PIG_INTERSTORAGE_SYNCMARKER_SIZE = 
"pig.interstorage.syncmarker.size";
+    public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX = 16;
+    public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT = 10;
+    public static final int PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN = 2;
+
+    /**
+     * Defines the interval (in bytes) when a sync marker should be written 
into the binary file
+     */
+    public static final String PIG_INTERSTORAGE_SYNCMARKER_INTERVAL = 
"pig.interstorage.syncmarker.interval";
+    public static final long PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT = 
2000;
+
+
     /**
      * Boolean value used to enable or disable fetching without a mapreduce 
job for DUMP. True by default
      */

Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java Sat Jul 22 
11:38:47 2017
@@ -42,16 +42,23 @@ import org.apache.pig.data.Tuple;
 public class InterRecordReader extends RecordReader<Text, Tuple> {
 
   private long start;
-  private long pos;
+  private long lastDataPos;
   private long end;
   private BufferedPositionedInputStream in;
   private Tuple value = null;
-  public static final int RECORD_1 = 0x01;
-  public static final int RECORD_2 = 0x02;
-  public static final int RECORD_3 = 0x03;
   private DataInputStream inData = null;
   private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
 
+  private byte[] syncMarker;
+  private long lastSyncPos = -1;
+  private long syncMarkerInterval;
+  private long dataBytesSeen = 0;
+
+  public InterRecordReader(int syncMarkerLength, long syncMarkerInterval) {
+      this.syncMarker = new byte[syncMarkerLength];
+      this.syncMarkerInterval = syncMarkerInterval;
+  }
+
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
     FileSplit split = (FileSplit) genericSplit;
@@ -60,63 +67,131 @@ public class InterRecordReader extends R
     end = start + split.getLength();
     final Path file = split.getPath();
 
-    // open the file and seek to the start of the split
+    // open the file
     FileSystem fs = file.getFileSystem(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
-    if (start != 0) {
-        fileIn.seek(start);
+
+    // read the magic byte sequence serving as record marker but only if the 
file is not empty
+    if (!(start == 0 && end == 0)) {
+        fileIn.readFully(0, syncMarker, 0, syncMarker.length);
     }
+
+    //seek to the start of the split
+    fileIn.seek(start);
+
     in = new BufferedPositionedInputStream(fileIn, start);
     inData = new DataInputStream(in);
   }
-  
-  public boolean nextKeyValue() throws IOException {
+
+
+    /**
+     * Skips to next sync marker
+     * @return true if marker was observed, false if EOF or EndOfSplit was 
reached
+     * @throws IOException
+     */
+  private boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
       int b = 0;
-      //    skip to next record
-      while (true) {
-          if (in == null || in.getPosition() >=end) {
-              return false;
-          }
-          // check if we saw RECORD_1 in our last attempt
-          // this can happen if we have the following 
-          // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3
-          // After reading the second RECORD_1 in the above
-          // sequence, we should not look for RECORD_1 again
-          if(b != RECORD_1) {
+outer:while (b != -1) {
+          if (b != syncMarker[0]) {
+
+              //There may be a case where we read through a whole split 
without a marker, then we shouldn't proceed
+              // because the records are from the next split which another 
reader would pick up too
+              if (in.getPosition() >= end) {
+                  return false;
+              }
               b = in.read();
-              if(b != RECORD_1 && b != -1) {
+              if ((byte) b != syncMarker[0] && b != -1) {
                   continue;
               }
-              if(b == -1) return false;
+              if (b == -1) return false;
           }
-          b = in.read();
-          if(b != RECORD_2 && b != -1) {
-              continue;
+          int i = 1;
+          while (i < syncMarker.length) {
+              b = in.read();
+              if (b == -1) return false;
+              if ((byte) b != syncMarker[i]) {
+                  continue outer;
+              }
+              ++i;
           }
-          if(b == -1) return false;
+          lastSyncPos = in.getPosition();
+          return true;
+      }
+      return false;
+  }
+
+    /**
+     * Reads a sync marker
+     * @return true if sync marker was read, false if EOF reached
+     * @throws IOException thrown if neither EOF nor proper sync was found
+     */
+  private boolean readSyncFullyOrEOF() throws IOException {
+      int b = in.read();
+      if (b == -1) {
+          //EOF reached
+          return false;
+      }
+      if ((byte) b != syncMarker[0]) {
+          throw new IOException("Corrupt data file, expected sync marker at 
position " + in.getPosition());
+      }
+      int i = 1;
+      while (i < syncMarker.length) {
           b = in.read();
-          if(b != RECORD_3 && b != -1) {
-              continue;
+          if ((byte) b != syncMarker[i]) {
+              throw new IOException("Corrupt data file, expected sync marker 
at position " + in.getPosition());
           }
-          if(b == -1) return false;
-          b = in.read();
-          if(!BinInterSedes.isTupleByte((byte) b) &&
-                  b != -1) {
-              continue;
+          ++i;
+      }
+      lastSyncPos = in.getPosition();
+      return true;
+
+  }
+
+  private boolean readDataOrEOF() throws IOException {
+      long preDataPos = in.getPosition();
+      int b = in.read();
+      if(!BinInterSedes.isTupleByte((byte) b) ) {
+          if (b == -1) {
+              //EOF reached
+              return false;
+          } else {
+              throw new IOException("Corrupt data file, expected tuple type 
byte, but seen " + b);
           }
-          if(b == -1) return false;
-          break;
       }
       try {
-          // if we got here, we have seen 
RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
-          // sequence - lets now read the contents of the tuple 
           value =  (Tuple)sedes.readDatum(inData, (byte)b);
-          pos=in.getPosition();
+          lastDataPos = in.getPosition();
+          dataBytesSeen += (lastDataPos-preDataPos);
           return true;
       } catch (ExecException ee) {
           throw ee;
       }
+  }
+
+  public boolean nextKeyValue() throws IOException {
+
+      //No marker has been seen, look for next marker
+      if (lastSyncPos == -1) {
+          if (!skipUntilMarkerOrSplitEndOrEOF()) {
+              return false;
+          }
+      }
+
+      //If we've read more or equal amount of data than the sync interval, we 
expect a sync marker or EOF
+      if (dataBytesSeen >= syncMarkerInterval) {
+          boolean isEOF = !readSyncFullyOrEOF();
+          if (isEOF) {
+              return false;
+          }
+          dataBytesSeen = 0;
+          //If we've just seen a (non-first) sync marker which was completely 
in the next split then we need to stop
+          if (in.getPosition()-syncMarker.length >= end) {
+              return false;
+          }
+      }
 
+      //Sync marker has been seen, expect data
+      return readDataOrEOF();
   }
 
   @Override
@@ -138,7 +213,7 @@ public class InterRecordReader extends R
     if (start == end) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (pos - start) / (float)(end - start));
+      return Math.min(1.0f, (lastDataPos - start) / (float)(end - start));
     }
   }
   

Modified: pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterRecordWriter.java Sat Jul 22 
11:38:47 2017
@@ -17,12 +17,16 @@
  */
 package org.apache.pig.impl.io;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Time;
 import org.apache.pig.data.InterSedes;
 import org.apache.pig.data.InterSedesFactory;
 import org.apache.pig.data.Tuple;
@@ -35,20 +39,34 @@ import org.apache.pig.data.Tuple;
 public class InterRecordWriter extends
         RecordWriter<org.apache.hadoop.io.WritableComparable, Tuple> {
 
-    public static final int RECORD_1 = 0x01;
-    public static final int RECORD_2 = 0x02;
-    public static final int RECORD_3 = 0x03;
     private static InterSedes sedes = 
InterSedesFactory.getInterSedesInstance();
+
+    private byte[] syncMarker;
+    private long lastSyncPos = -1;
+    private long syncMarkerInterval;
     /**
      * the outputstream to write out on
      */
-    private DataOutputStream out;
+    private FSDataOutputStream out;
     
     /**
      * 
      */
-    public InterRecordWriter(DataOutputStream out) {
+    public InterRecordWriter(FSDataOutputStream out, int syncMarkerLength, 
long syncMarkerInterval) {
         this.out = out;
+        this.syncMarkerInterval = syncMarkerInterval;
+        syncMarker = new byte[syncMarkerLength];
+
+        try {
+            MessageDigest digester = MessageDigest.getInstance("MD5");
+            long time = Time.now();
+            digester.update((new UID()+"@"+time).getBytes());
+            byte[] generatedMarker = digester.digest();
+            System.arraycopy(generatedMarker, 0, syncMarker, 0, 
syncMarkerLength);
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException(e);
+        }
+
     }
 
     /* (non-Javadoc)
@@ -66,10 +84,11 @@ public class InterRecordWriter extends
     @Override
     public void write(WritableComparable wc, Tuple t) throws IOException,
             InterruptedException {
-        // we really only want to write the tuple (value) out here
-        out.write(RECORD_1);
-        out.write(RECORD_2);
-        out.write(RECORD_3);
+        // we really only want to write the tuple (value) out here (and a sync 
syncMarker before that if necessary)
+        if (lastSyncPos == -1 || out.getPos() >= (lastSyncPos + 
syncMarkerInterval)) {
+            out.write(syncMarker);
+            lastSyncPos = out.getPos();
+        }
         sedes.writeDatum(out, t);
         
     }

Modified: pig/trunk/src/org/apache/pig/impl/io/InterStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/InterStorage.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/InterStorage.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/InterStorage.java Sat Jul 22 11:38:47 
2017
@@ -39,6 +39,7 @@ import org.apache.pig.Expression;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFunc;
@@ -62,10 +63,10 @@ implements StoreFuncInterface, LoadMetad
 
     private static final Log mLog = LogFactory.getLog(InterStorage.class);
     public static final String useLog = "Pig Internal storage in use";
-    
+
     private InterRecordReader recReader = null;
     private InterRecordWriter recWriter = null;
-    
+
     /**
      * Simple binary nested reader format
      */
@@ -102,7 +103,9 @@ implements StoreFuncInterface, LoadMetad
         public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
                 TaskAttemptContext context) throws IOException,
                 InterruptedException {
-            return new InterRecordReader();
+            return new 
InterRecordReader(retrieveMarkerLengthFromConf(context.getConfiguration()),
+                                         
retrieveMarkerIntervalFromConf(context.getConfiguration())
+            );
         }
 
     }
@@ -141,7 +144,10 @@ implements StoreFuncInterface, LoadMetad
             Path file = getDefaultWorkFile(job, "");
             FileSystem fs = file.getFileSystem(conf);
             FSDataOutputStream fileOut = fs.create(file, false);
-            return new InterRecordWriter(fileOut);
+            return new InterRecordWriter(fileOut,
+                    retrieveMarkerLengthFromConf(job.getConfiguration()),
+                    retrieveMarkerIntervalFromConf(job.getConfiguration())
+            );
         }
     }
 
@@ -208,4 +214,22 @@ implements StoreFuncInterface, LoadMetad
     public void cleanupOnSuccess(String location, Job job) throws IOException {
         // DEFAULT: do nothing
     }
+
+    private static int retrieveMarkerLengthFromConf(Configuration conf) {
+        int requestedLength = 
conf.getInt(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE,
+                PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT);
+
+        if (requestedLength > 
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX) {
+            requestedLength = 
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MAX;
+        } else if (requestedLength < 
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN) {
+            requestedLength = 
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_MIN;
+        }
+
+        return requestedLength;
+    }
+
+    private static long retrieveMarkerIntervalFromConf(Configuration conf) {
+        return 
conf.getLong(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL,
+                PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
+    }
 }

Modified: pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java (original)
+++ pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java Sat Jul 22 11:38:47 
2017
@@ -44,6 +44,7 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -506,30 +507,20 @@ public class TestSchemaTuple {
         File temp = File.createTempFile("tmp", "tmp");
         temp.deleteOnExit();
         FileOutputStream fos = new FileOutputStream(temp);
-        DataOutputStream dos = new DataOutputStream(fos);
+        FSDataOutputStream dos = new FSDataOutputStream(fos, null);
 
-        InterRecordWriter writer = new InterRecordWriter(dos);
+        InterRecordWriter writer = new InterRecordWriter(dos,
+                PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT,
+                PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
 
-        // We add these lines because a part of the InterStorage logic
-        // is the ability to seek to the next Tuple based on a magic set
-        // of bytes. This emulates the random byes that will be present
-        // at the beginning of a split.
-        dos.writeByte(r.nextInt());
-        dos.writeByte(r.nextInt());
-        dos.writeByte(r.nextInt());
-        dos.writeByte(r.nextInt());
-        dos.writeByte(r.nextInt());
-        dos.writeByte(r.nextInt());
+        // This test does not cover the case of overlapping record bytes that 
may be present at the
+        // beginning of a split, for that see 
org.apache.pig.test.TestBinInterSedes#testInterStorageSyncMarker()
 
         for (int i = 0; i < sz; i++) {
             SchemaTuple<?> st = (SchemaTuple<?>)tf.newTuple();
             fillWithData(st);
             writer.write(null, st);
             written.add(st);
-
-            dos.writeByte(r.nextInt());
-            dos.writeByte(r.nextInt());
-            dos.writeByte(r.nextInt());
         }
         writer.close(null);
 
@@ -541,7 +532,8 @@ public class TestSchemaTuple {
 
         InputSplit is = new FileSplit(new Path(temp.getAbsolutePath()), 0, 
temp.length(), null);
 
-        InterRecordReader reader = new InterRecordReader();
+        InterRecordReader reader = new 
InterRecordReader(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT,
+                                                         
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT);
         reader.initialize(is, HadoopShims.createTaskAttemptContext(conf, 
taskId));
 
         for (int i = 0; i < sz; i++) {

Modified: pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBinInterSedes.java Sat Jul 22 
11:38:47 2017
@@ -24,11 +24,16 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.BinInterSedes;
 import org.apache.pig.data.DataBag;
@@ -296,6 +301,142 @@ public class TestBinInterSedes {
         }
     }
 
+
+    /*
+      The following tests are intended to verify the reading and writing of 
intermediate files of Pig (of InterStorage)
+      The test records are 11,14,22,14 bytes long.
+      Below I illustrate the splits in rows, records as [] with size and sync 
markers with [M]
+     */
+
+    /**
+     * One sync marker only and three splits where the records overlap the 
splitends.
+     * (Reader of 1st split should read every record, readers of 2nd and 3rd 
splits should read no records.)
+     * [M(10)] [11] [11-
+     *  -3] [ 22 ] [7-
+     *  -7]
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSyncMarkerOneMarkerAtBeginningOnly() throws Exception {
+        testInterStorageSyncMarker(32, 10, 2000L);
+    }
+
+    /**
+     * Some sync markers are positioned so that they begin at a split's end 
and they end in the next split's beginning.
+     * (Reader of a split has to read until the next sync marker that has all 
its bytes in a following split.)
+     * @throws Exception
+     */
+    @Test
+    public void testSyncMarkerOverlappingMarker() throws Exception {
+        /*
+         * [M(16)] [11] [M(16)] [5-
+         * -9] [M(16)] [ 22 ] [M(1-
+         * -15)] [14]
+         */
+        testInterStorageSyncMarker(48, 16, 10L);
+        /*
+         * [M(4)] [ 4-
+         *     -7] [1-
+         *    - 8 -
+         *   -5] [M(3-
+         * -1)] [ 7-
+         *    - 8 -
+         *   -7] [M(1-
+         * -3)] [  5-
+         *    - 8 -
+         *  -1]
+         */
+        testInterStorageSyncMarker(8, 4, 20L);
+    }
+
+    /**
+     * No illustration for this one to save characters .. Sync size is over 3 
times the size of split size, this is an
+     * extremely unlikely scenario. Markers here span over 4 splits.
+     * @throws Exception
+     */
+    @Test
+    public void testSyncMarkerLongerMarkerThanSplit() throws Exception {
+        testInterStorageSyncMarker(5, 16, 20L);
+    }
+
+    /**
+     * A sync marker is positioned at exactly the end of the first split 
without overlapping into the next one.
+     * (Reader of the 1st split should read past it and into the 2nd split 
until next marker.)
+     *
+     * [M(2)] [11] [14] [M(2)]
+     * [  22  ] [M(2)] [ 5-
+     * -9]
+     * @throws Exception
+     */
+    @Test
+    public void testSyncMarkerMarkerOnSplitEnd() throws Exception {
+        testInterStorageSyncMarker(29, 2, 20L);
+    }
+
+    /**
+     * A sync marker is positioned at exactly the beginning of the 3rd split.
+     * (Reader of the 1st split should read 1st and 2nd splits fully, reader 
of 2nd split should read no records.)
+     *
+     * [M(3)] [11]
+     * [    14   ]
+     * [M(3) [11-
+     *   -11 ] [3-
+     *      -11 ]
+     * @throws Exception
+     */
+    @Test
+    public void testSyncMarkerMarkerOnSplitBeginning() throws Exception {
+        testInterStorageSyncMarker(14, 3, 25L);
+    }
+
+    private void testInterStorageSyncMarker(int maxSplitSize, int syncSize, 
long syncInterval) throws Exception {
+        PigServer pigServer = new PigServer(Util.getLocalTestMode(), new 
Properties());
+
+        Properties pigProperties = pigServer.getPigContext().getProperties();
+        
pigProperties.setProperty("mapreduce.input.fileinputformat.split.maxsize", 
String.valueOf(maxSplitSize));
+        
pigProperties.setProperty(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE, 
String.valueOf(syncSize));
+        
pigProperties.setProperty(PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL,
 String.valueOf(syncInterval));
+
+        //Without proper random record markers 0x01020327 would be identified 
as a marker and 0x50 as an unknown datatype
+        //ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03, 0x27, 0x50, 0x0, 0x0, 
0x0}).getLong() => 72624011372134400
+
+        String[] inputData = new 
String[]{"apple\t1\t1","orange\t2\t2","kiwi\t16909095\t72624011372134400","orange\t4\t4"};
+        String[] expected = new String[] 
{"(apple,1,1)","(orange,2,2)","(kiwi,16909095,72624011372134400)","(orange,4,4)"};
+        File inputFile = Util.createInputFile("interStorageInput", "", 
inputData);
+        inputFile.deleteOnExit();
+
+        //Without proper random record markers 0x01020327 would be identified 
as a marker and although no errors are
+        // thrown the result will contain incorrect schema and values past 
this number
+        //ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03, 0x27, 0x01, 0x0, 0x0, 
0x0}).getLong() => 72624010046734336
+
+        String[] inputData2 = new 
String[]{"apple\t1\t1","orange\t2\t2","kiwi\t16909095\t72624010046734336","orange\t4\t4"};
+        String[] expected2 = new String[] 
{"(apple,1,1)","(orange,2,2)","(kiwi,16909095,72624010046734336)","(orange,4,4)"};
+        File inputFile2 = Util.createInputFile("interStorageInput2", "", 
inputData2);
+        inputFile2.deleteOnExit();
+
+        File binOutputdir = new File("build/test/interStorageTest");
+        Util.deleteDirectory(binOutputdir);
+
+        String script = "A = LOAD '"+inputFile.getAbsolutePath()+"' AS 
(name:chararray, cnt:int, cnt2:long);\n" +
+                "STORE A INTO '"+binOutputdir.getAbsolutePath()+"' USING 
org.apache.pig.impl.io.InterStorage();\n" +
+                "\n" +
+                "B = LOAD '"+binOutputdir.getAbsolutePath()+"' USING 
org.apache.pig.impl.io.InterStorage();\n";
+
+        pigServer.registerQuery(script);
+        Iterator<Tuple> it = pigServer.openIterator("B");
+        Util.checkQueryOutputsAfterSortRecursive(it, expected,
+                
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("B")));
+
+        Util.deleteDirectory(binOutputdir);
+
+        pigServer.registerQuery(script.replaceAll(inputFile.getAbsolutePath(), 
inputFile2.getAbsolutePath()));
+        it = pigServer.openIterator("B");
+        Util.checkQueryOutputsAfterSortRecursive(it, expected2,
+                
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("B")));
+
+    }
+
     private void testSerTuple(Tuple t, byte[] expected) throws Exception {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutput out = new DataOutputStream(baos);

Modified: pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=1802676&r1=1802675&r2=1802676&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Sat Jul 22 11:38:47 2017
@@ -414,12 +414,18 @@ public class TestFRJoin2 {
         pigServer.registerQuery("C = foreach C generate MAX(B.x) as x;");
         pigServer.registerQuery("D = join A by x, B by x, C by x using 
'repl';");
         {
-            // When the replicated input sizes=(12 + 5) is bigger than
-            // pig.join.replicated.max.bytes=16, we throw exception
+            // When the replicated input size is bigger than
+            // pig.join.replicated.max.bytes, we throw exception
+            // Expected replicated size below:
+            //  Alias B: sync marker + 2 records (1 tuple type byte + 2 
integers (0 or 1))
+            //  Alias C: sync marker + 1 record (1 tuple type byte + 1 integer 
(1))
+            long expectedReplicateSize = 
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT + 2*(1 +1+1)
+                                       + 
PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_SIZE_DEFAULT + 1*(1 +1);
+
             try {
                 pigServer.getPigContext().getProperties().setProperty(
                         PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES,
-                        String.valueOf(16));
+                        String.valueOf(expectedReplicateSize-1));
                 pigServer.openIterator("D");
                 Assert.fail();
             } catch (FrontendException e) {
@@ -428,10 +434,10 @@ public class TestFRJoin2 {
                         e.getCause().getCause().getCause().getMessage());
             }
 
-            // If we increase the size to 17, it should work
+            // If we increase the max size setting to the expected amount it 
works
             pigServer.getPigContext().getProperties().setProperty(
                         PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES,
-                        String.valueOf(17));
+                        String.valueOf(expectedReplicateSize));
             pigServer.openIterator("D");
         }
     }


Reply via email to