Author: rohini
Date: Tue Dec 17 16:03:38 2013
New Revision: 1551595

URL: http://svn.apache.org/r1551595
Log:
OOZIE-1642 writeUTF 64k limit for counters (puru via rohini)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowLib.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java?rev=1551595&r1=1551594&r2=1551595&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
 Tue Dec 17 16:03:38 2013
@@ -51,6 +51,10 @@ public class LiteWorkflowInstance implem
     private static String ROOT = PATH_SEPARATOR;
     private static String TRANSITION_SEPARATOR = "#";
 
+    // Using unique string to indicate version. This is to make sure that it
+    // doesn't match with user data.
+    private static final String DATA_VERSION = "V==1";
+
     private static class NodeInstance {
         String nodeName;
         boolean started = false;
@@ -579,7 +583,7 @@ public class LiteWorkflowInstance implem
         dOut.writeInt(persistentVars.size());
         for (Map.Entry<String, String> entry : persistentVars.entrySet()) {
             dOut.writeUTF(entry.getKey());
-            dOut.writeUTF(entry.getValue());
+            writeStringAsBytes(entry.getValue(), dOut);
         }
     }
 
@@ -609,12 +613,34 @@ public class LiteWorkflowInstance implem
         int numVars = dIn.readInt();
         for (int x = 0; x < numVars; x++) {
             String vName = dIn.readUTF();
-            String vVal = dIn.readUTF();
+            String vVal = readBytesAsString(dIn);
             persistentVars.put(vName, vVal);
         }
         refreshLog();
     }
 
+    private void writeStringAsBytes(String value, DataOutput dOut) throws 
IOException {
+        if (value == null) {
+            dOut.writeUTF(null);
+            return;
+        }
+        dOut.writeUTF(DATA_VERSION);
+        byte[] data = value.getBytes("UTF-8");
+        dOut.writeInt(data.length);
+        dOut.write(data);
+    }
+
+    private String readBytesAsString(DataInput dIn) throws IOException {
+        String value = dIn.readUTF();
+        if (value != null && value.equals(DATA_VERSION)) {
+            int length = dIn.readInt();
+            byte[] data = new byte[length];
+            dIn.readFully(data);
+            value = new String(data, "UTF-8");
+        }
+        return value;
+    }
+
     @Override
     public Configuration getConf() {
         return conf;

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowLib.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowLib.java?rev=1551595&r1=1551594&r2=1551595&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowLib.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowLib.java
 Tue Dec 17 16:03:38 2013
@@ -18,6 +18,7 @@
 package org.apache.oozie.workflow.lite;
 
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.workflow.WorkflowException;
 import org.apache.oozie.workflow.WorkflowInstance;
@@ -756,6 +757,24 @@ public class TestLiteWorkflowLib extends
         assertEquals(WorkflowInstance.Status.SUCCEEDED, job.getStatus());
     }
 
+    public void testJobPersistanceMoreThan64K() throws WorkflowException {
+        LiteWorkflowApp def = new LiteWorkflowApp("wf", "<worklfow-app/>", new 
StartNodeDef(
+                TestControlNodeHandler.class, "one")).addNode(
+                new NodeDef("one", null, AsynchNodeHandler.class, 
Arrays.asList(new String[] { "end" }))).addNode(
+                new EndNodeDef("end", TestControlNodeHandler.class));
+
+        LiteWorkflowInstance job = new LiteWorkflowInstance(def, new 
XConfiguration(), "1");
+        // 100k
+        String value = RandomStringUtils.randomAlphanumeric(100 * 1024);
+        job.setVar("a", value);
+        assertEquals(WorkflowInstance.Status.PREP, job.getStatus());
+        assertEquals(value, job.getVar("a"));
+
+        byte[] array = WritableUtils.toByteArray(job);
+        job = WritableUtils.fromByteArray(array, LiteWorkflowInstance.class);
+        assertEquals(WorkflowInstance.Status.PREP, job.getStatus());
+        assertEquals(value, job.getVar("a"));
+    }
 
     public void testImmediateError() throws WorkflowException {
         LiteWorkflowApp workflowDef = new LiteWorkflowApp("testWf", 
"<worklfow-app/>",

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1551595&r1=1551594&r2=1551595&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Dec 17 16:03:38 2013
@@ -1,6 +1,7 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
-OOZIE-1641 oozie-audit.log - add remote IP. (puru via rohini)
+OOZIE-1642 writeUTF 64k limit for counters (puru via rohini)
+OOZIE-1641 oozie-audit.log - add remote IP (puru via rohini)
 OOZIE-1635 verifySlaElement in submitXCommand.java should get sla info from 
action child as well (bowenzhangusa via rohini)
 OOZIE-1575 Add functionality to submit sqoop jobs through http on oozie server 
side (bowenzhangusa via rkanter)
 OOZIE-1634 TestJavaActionExecutor#testUpdateConfForUberMode fails against 
Hadoop 2 (rkanter)


Reply via email to