Author: szita
Date: Mon May 22 19:46:08 2017
New Revision: 1795844

URL: http://svn.apache.org/viewvc?rev=1795844&view=rev
Log:
PIG-4748: DateTimeWritable forgets Chronology (szita)

Added:
    pig/trunk/test/org/apache/pig/test/TestDateTime.java
Removed:
    pig/trunk/test/org/apache/pig/test/TestDefaultDateTimeZone.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/impl/PigImplConstants.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 22 19:46:08 2017
@@ -97,6 +97,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4748: DateTimeWritable forgets Chronology (szita)
+
 PIG-5229: TestPigTest.testSpecificOrderOutput and 
testSpecificOrderOutputForAlias failing (knoguchi)
 
 PIG-5226: PreprocessorContext.java can deadlock forever with large stderr 
(jtolar via knoguchi)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java Mon May 
22 19:46:08 2017
@@ -21,20 +21,27 @@ package org.apache.pig.backend.hadoop;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.TreeSet;
 
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
- * Writable for Double values.
+ * Writable for DateTime values.
  */
 public class DateTimeWritable implements WritableComparable {
-    
-    private static final int ONE_MINUTE = 60000;
 
+    private static List<String> availableZoneIDs = null;
     private DateTime value = null;
 
     public DateTimeWritable() {
@@ -46,12 +53,32 @@ public class DateTimeWritable implements
     }
 
     public void readFields(DataInput in) throws IOException {
-        value = new DateTime(in.readLong(), 
DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));
+        retrieveAvailableZoneList();
+
+        long instant = in.readLong();
+        int offsetInMillis = in.readInt();
+        int zoneListPos = in.readInt();
+
+        DateTimeZone timeZone = null;
+        if (zoneListPos != -1){
+            timeZone = DateTimeZone.forID(availableZoneIDs.get(zoneListPos));
+        } else  {
+            timeZone = DateTimeZone.forOffsetMillis(offsetInMillis);
+        }
+
+        value = new DateTime(instant, timeZone);
     }
 
     public void write(DataOutput out) throws IOException {
+        retrieveAvailableZoneList();
+
+        String zoneId = value.getZone().getID();
+        int offsetInMillis = value.getZone().getOffset(0L);
+        int zoneListPos = availableZoneIDs.indexOf(zoneId);
+
         out.writeLong(value.getMillis());
-        out.writeShort(value.getZone().getOffset(value) / ONE_MINUTE);
+        out.writeInt(offsetInMillis);
+        out.writeInt(zoneListPos);
     }
 
     public void set(DateTime dt) {
@@ -62,6 +89,26 @@ public class DateTimeWritable implements
         return value;
     }
 
+    private void retrieveAvailableZoneList() throws IOException {
+        if (availableZoneIDs != null){
+            return;
+        }
+        Properties props = 
UDFContext.getUDFContext().getUDFProperties(PigImplConstants.PIG_DATETIME_ZONES_LIST.getClass());
+        Collection<String> zoneList = 
StringUtils.getStringCollection(props.getProperty(PigImplConstants.PIG_DATETIME_ZONES_LIST));
+        if (zoneList == null || zoneList.size() == 0){
+            throw new IOException("Datetime zone information not set");
+        }
+        availableZoneIDs = new ArrayList<>(zoneList);
+    }
+
+    public static void setupAvailableZoneIds() {
+        TreeSet<String> sortedZoneIDs = new 
TreeSet<>(DateTimeZone.getAvailableIDs());
+        Properties props = UDFContext.getUDFContext().getUDFProperties(
+                PigImplConstants.PIG_DATETIME_ZONES_LIST.getClass());
+        props.setProperty(PigImplConstants.PIG_DATETIME_ZONES_LIST,  
StringUtils.arrayToString(
+                sortedZoneIDs.toArray(new String[0])));
+    }
+
     /**
      * Returns true iff <code>o</code> is a DateTimeWritable with the same
      * value.

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Mon May 22 19:46:08 2017
@@ -70,6 +70,7 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.DateTimeWritable;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.PigJobControl;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -533,6 +534,9 @@ public class JobControlCompiler{
                      
conf.get(MRConfiguration.JOB_REDUCE_MARKRESET_BUFFER_PERCENT));
         }
 
+
+        DateTimeWritable.setupAvailableZoneIds();
+
         configureCompression(conf);
 
         try{

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Mon May 22 19:46:08 2017
@@ -55,6 +55,7 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.DateTimeWritable;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
@@ -636,6 +637,8 @@ public class TezDagBuilder extends TezOp
         payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, 
pc.getExecType().isLocal());
         payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, 
ObjectSerializer.serialize(pc.getLog4jProperties()));
 
+        DateTimeWritable.setupAvailableZoneIds();
+
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
 

Modified: pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigImplConstants.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigImplConstants.java Mon May 22 19:46:08 
2017
@@ -85,6 +85,11 @@ public class PigImplConstants {
      */
     public static final String PIG_AUDIT_ID = "pig.script.id";
 
+    /**
+     * Used to carry zone ID list from frontend to backend (generated by 
frontend during Job creation)
+     */
+    public static final String PIG_DATETIME_ZONES_LIST = 
"pig.datetime.zones.list";
+
     // Kill the jobs before cleaning up tmp files
     public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
     public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;

Added: pig/trunk/test/org/apache/pig/test/TestDateTime.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDateTime.java?rev=1795844&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDateTime.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestDateTime.java Mon May 22 19:46:08 
2017
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.DateTimeWritable;
+import org.apache.pig.builtin.ToDate;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.util.UDFContext;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import static org.junit.Assert.assertEquals;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestDateTime {
+
+    private static MiniGenericCluster cluster;
+    private static PigServer pigServer;
+    private static PigServer pigServerLocal;
+    private static File tmpFile;
+    private static DateTimeZone currentDTZ;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        pigServer = new PigServer(cluster.getExecType(), 
cluster.getProperties());
+        pigServerLocal = new PigServer(Util.getLocalTestMode(), new 
Properties());
+        currentDTZ = DateTimeZone.getDefault();
+
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        ps.println("1970-01-01T00:00:00.000");
+        ps.println("1970-01-01T00:00:00.000Z");
+        ps.println("1970-01-03T00:00:00.000");
+        ps.println("1970-01-03T00:00:00.000Z");
+        ps.println("1970-01-05T00:00:00.000");
+        ps.println("1970-01-05T00:00:00.000Z");
+        // for testing DST
+        ps.println("2014-02-01T00:00:00.000"); // EST
+        ps.println("2014-06-01T00:00:00.000"); // EDT
+        ps.close();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+        tmpFile.delete();
+    }
+
+    @After
+    public void restoreDefaultTZ() throws Exception {
+        DateTimeZone.setDefault(currentDTZ);
+    }
+
+    @Before
+    public void cleanUpTmpFiles() throws Exception {
+        FileLocalizer.deleteTempFiles();
+    }
+
+    @Test
+    public void testDateTimeWritables() throws IOException {
+        Configuration jobConf = new Configuration();
+
+        DateTimeWritable.setupAvailableZoneIds();
+        UDFContext.getUDFContext().addJobConf(jobConf);
+
+        String[] testZones = new String[]{"America/Los_Angeles", 
"Europe/London", "Europe/Budapest", "Japan"};
+
+        DateTime testInputBase = DateTime.now();
+        for (String zoneId : testZones){
+            DateTimeZone customTZ = DateTimeZone.forID(zoneId);
+            //Test with timezone name as zone ID
+            testDateTimeForZone(new 
DateTime(testInputBase).withZoneRetainFields(customTZ));
+            //Test with offset as zone ID
+            testDateTimeForZone(new DateTime(testInputBase, 
DateTimeZone.forOffsetMillis(customTZ.getOffset(testInputBase))));
+        }
+
+    }
+
+    private void testDateTimeForZone(DateTime testIn) throws IOException {
+        DateTimeWritable in = new DateTimeWritable(testIn);
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream dataOut = new DataOutputStream(outputStream);
+        in.write(dataOut);
+        dataOut.flush();
+
+        // read from byte[]
+        DateTimeWritable out = new DateTimeWritable();
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(
+                outputStream.toByteArray());
+        DataInputStream dataIn = new DataInputStream(inputStream);
+        out.readFields(dataIn);
+
+        assertEquals(in.get(), out.get());
+    }
+
+    @Test
+    public void testLocalExecution() throws Exception {
+        Iterator<Tuple> expectedItr = generateExpectedResults(DateTimeZone
+                
.forOffsetMillis(DateTimeZone.forID("+08:00").getOffset(null)));
+        
pigServerLocal.getPigContext().getProperties().setProperty("pig.datetime.default.tz",
 "+08:00");
+        pigServerLocal.registerQuery("a = load '"
+                + Util.encodeEscape(Util.generateURI(tmpFile.toString(), 
pigServerLocal.getPigContext()))
+                + "' as (test:datetime);");
+        pigServerLocal.registerQuery("b = filter a by test < 
ToDate('1970-01-04T00:00:00.000');");
+        Iterator<Tuple> actualItr = pigServerLocal.openIterator("b");
+        while (expectedItr.hasNext() && actualItr.hasNext()) {
+            Tuple expectedTuple = expectedItr.next();
+            Tuple actualTuple = actualItr.next();
+            assertEquals(expectedTuple, actualTuple);
+        }
+        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+    }
+
+    /**
+     * Tests DateTimeWritables on cluster
+     * @throws Exception
+     * Below input will trigger DateTime instances with both long zone id 
(UTC) and short offset (+01:00)
+     */
+    @Test
+    public void testDateTimeZoneOnCluster() throws Exception {
+        String inputFileName = "testDateTime-input.txt";
+        String[] inputData = new String[]{  "1\t1990-01-04T12:30:00.000+01:00",
+                "2\t1990-01-04T11:30:00.000Z",
+                "3\t2001-01-01T01:00:00.000",
+                "4\t2017-02-02T15:19:00.000+01:00"
+        };
+        Util.createInputFile(cluster, inputFileName, inputData);
+
+        String script = "A = LOAD '"+inputFileName+"' AS (foo:int, 
sd:datetime);" +
+                "B = group A by sd;" +
+                "C = foreach B generate group, MAX(A.foo);";
+
+        Util.registerMultiLineQuery(pigServer, script);
+
+        Iterator<Tuple> it = pigServer.openIterator("C");
+
+        //Should return last 3 rows from input
+        String sysTZOffset = 
DateTimeZone.forOffsetMillis(DateTime.now().getZone().getOffset(0L)).toString();
+        Util.checkQueryOutputsAfterSortRecursive(
+                it,
+                new String[]{
+                        "(1990-01-04T11:30:00.000Z,2)",
+                        "(2001-01-01T01:00:00.000"+sysTZOffset+",3)",
+                        "(2017-02-02T15:19:00.000+01:00,4)"
+                },
+                
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+    }
+
+    @Test
+    public void testZoneDST() throws Exception {
+        String defaultDTZ = "America/New_York"; // a timezone that uses DST
+        
pigServerLocal.getPigContext().getProperties().setProperty("pig.datetime.default.tz",
 defaultDTZ);
+        pigServerLocal.registerQuery("a = load '"
+                + Util.encodeEscape(Util.generateURI(tmpFile.toString(), 
pigServerLocal.getPigContext()))
+                + "' as (test:datetime);");
+        pigServerLocal.registerQuery("b = filter a by test > 
ToDate('2014-01-01T00:00:00.000');");
+        pigServerLocal.registerQuery("c = foreach b generate ToString(test, 
'Z') as tz;");
+        Iterator<Tuple> actualItr = pigServerLocal.openIterator("c");
+
+        Tuple est = actualItr.next();
+        assertEquals(Util.buildTuple("-0500"), est);
+        Tuple edt = actualItr.next();
+        assertEquals(Util.buildTuple("-0400"), edt);
+    }
+
+    private static Iterator<Tuple> generateExpectedResults(DateTimeZone dtz)
+            throws Exception {
+        List<Tuple> expectedResults = new ArrayList<Tuple>();
+        expectedResults.add(Util.buildTuple(new DateTime(
+                "1970-01-01T00:00:00.000", dtz)));
+        expectedResults.add(Util.buildTuple(new DateTime(
+                "1970-01-01T00:00:00.000", DateTimeZone.UTC)));
+        expectedResults.add(Util.buildTuple(new DateTime(
+                "1970-01-03T00:00:00.000", dtz)));
+        expectedResults.add(Util.buildTuple(new DateTime(
+                "1970-01-03T00:00:00.000", DateTimeZone.UTC)));
+        return expectedResults.iterator();
+    }
+
+    @Test
+    public void testTimeZone() throws IOException {
+        // Usually set through "pig.datetime.default.tz"
+        String defaultDTZ = "+03:00";
+        DateTimeZone.setDefault(DateTimeZone.forID(defaultDTZ));
+        String[] inputs = {
+                "1970-01-01T00:00:00.000-08:00",
+                "1970-01-01T00:00",
+                "1970-01-01T00",
+                "1970-01-01T",
+                "1970-01T",
+                "1970T",
+                "1970-01-01T00:00-08:00",
+                "1970-01-01T00-05:00",
+                "1970-01-01T-08:00",
+                "1970-01T-08:00",
+                //"1970T+8:00", //Invalid format
+                "1970-01-01",
+                "1970-01",
+                "1970",
+        };
+        String[] expectedDTZOutputs = {
+                "-08:00",
+                defaultDTZ,
+                defaultDTZ,
+                defaultDTZ,
+                defaultDTZ,
+                defaultDTZ,
+                "-08:00",
+                "-05:00",
+                "-08:00",
+                "-08:00",
+                defaultDTZ,
+                defaultDTZ,
+                defaultDTZ
+        };
+        String[] expectedDTOutputs = {
+                "1970-01-01T00:00:00.000-08:00",
+                "1970-01-01T00:00:00.000" + defaultDTZ,
+                "1970-01-01T00:00:00.000" + defaultDTZ,
+                "1970-01-01T00:00:00.000" + defaultDTZ,
+                "1970-01-01T00:00:00.000" + defaultDTZ,
+                "1970-01-01T00:00:00.000" + defaultDTZ,
+                "1970-01-01T00:00:00.000-08:00",
+                "1970-01-01T00:00:00.000-05:00",
+                "1970-01-01T00:00:00.000-08:00",
+                "1970-01-01T00:00:00.000-08:00",
+                "1970-01-01T00:00:00.000" + defaultDTZ,
+                "1970-01-01T00:00:00.000" + defaultDTZ,
+                "1970-01-01T00:00:00.000" + defaultDTZ
+        };
+
+        for( int i = 0; i < inputs.length; i++ ) {
+            DateTimeZone dtz = ToDate.extractDateTimeZone( inputs[i] );
+            assertEquals( expectedDTZOutputs[i], dtz.toString() );
+            DateTime dt = ToDate.extractDateTime( inputs[i] );
+            assertEquals( expectedDTOutputs[i], dt.toString() );
+            System.out.println( "\"" + dt + "\"," );
+        }
+
+    }
+
+}


Reply via email to