Author: knoguchi
Date: Thu May 16 19:38:13 2024
New Revision: 1917770

URL: http://svn.apache.org/viewvc?rev=1917770&view=rev
Log:
PIG-5447: Add CachedIteratorTransform which was missed in the initial commit.

Added:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CachedIteratorTransform.java

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CachedIteratorTransform.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CachedIteratorTransform.java?rev=1917770&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CachedIteratorTransform.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CachedIteratorTransform.java
 Thu May 16 19:38:13 2024
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.util.Iterator;
+
+abstract class CachedIteratorTransform<IN, OUT> extends 
IteratorTransform<IN,OUT> {
+
+    public CachedIteratorTransform(Iterator<IN> delegate) {
+        super(delegate);
+    }
+
+    private OUT cachedObject = null;
+
+    // in case transform returns a valid null, adding one more flag
+    // to determine if the result is cached or not
+    private boolean isCached = false;
+
+    private boolean endReached = false;
+
+    // If transform traverses the "delegate" iterator in certain
+    // condition (like in
+    // SkewedJoinConverter.ToValueFunction.Tuple2TransformIterable)
+    // there is no easy way to determine if this iterator has next
+    // item or not except to actually call the transform method.
+    @Override
+    public boolean hasNext() {
+        if( endReached ) {
+            return false;
+        }
+        if( !isCached ) {
+            try {
+                cachedObject = transform(delegate.next());
+                isCached = true;
+            } catch (java.util.NoSuchElementException ex) {
+                cachedObject = null;
+                isCached = false;
+                endReached = true;
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public OUT next() {
+        if( !isCached ) {
+            return transform(delegate.next());
+        } else {
+            OUT retObject = cachedObject;
+            cachedObject = null;
+            isCached = false;
+            return retObject;
+        }
+    }
+}


Reply via email to