This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ac505b09 TEZ-4540: Reading proto data more than 2GB from multiple 
splits fails (#334) (Raghav Aggarwal reviewed by Laszlo Bodor)
0ac505b09 is described below

commit 0ac505b09d18b51cbe880e2804bee47b9ea6c329
Author: Raghav Aggarwal <raghavaggarwal03...@gmail.com>
AuthorDate: Thu Jun 20 19:56:30 2024 +0530

    TEZ-4540: Reading proto data more than 2GB from multiple splits fails 
(#334) (Raghav Aggarwal reviewed by Laszlo Bodor)
---
 .../logging/proto/ProtoMessageWritable.java        |  1 +
 .../proto/TestProtoHistoryLoggingService.java      | 47 ++++++++++++++++++++++
 2 files changed, 48 insertions(+)

diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
index df5743660..63a1ebda0 100644
--- 
a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java
@@ -98,5 +98,6 @@ public class ProtoMessageWritable<T extends MessageLite> 
implements Writable {
     }
     din.in = in;
     message = cin.readMessage(parser, ExtensionRegistry.getEmptyRegistry());
+    cin.resetSizeCounter();
   }
 }
diff --git 
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
index fd3154d90..4f24d30a8 100644
--- 
a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
+++ 
b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java
@@ -23,12 +23,14 @@ import static org.mockito.Mockito.when;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
+import com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -135,6 +137,51 @@ public class TestProtoHistoryLoggingService {
     scanner.close();
   }
 
+  @Test
+  public void testProtoMessageSizeReset() throws Exception {
+    // This test case is to confirm that cin.resetSizeCounter() was indeed 
called
+    ProtoHistoryLoggingService service = createService(false);
+    service.start();
+    TezDAGID dagId = TezDAGID.getInstance(appId, 0);
+    List<HistoryEventProto> protos = new ArrayList<>();
+    for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) {
+      protos.add(new 
HistoryEventProtoConverter().convert(event.getHistoryEvent()));
+      service.handle(event);
+    }
+    service.stop();
+
+    TezProtoLoggers loggers = new TezProtoLoggers();
+    Assert.assertTrue(loggers.setup(service.getConfig(), clock));
+
+    // Verify dag events are logged.
+    DatePartitionedLogger<HistoryEventProto> dagLogger = 
loggers.getDagEventsLogger();
+    Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId 
+ "_1");
+    try (ProtoMessageReader<HistoryEventProto> reader = 
dagLogger.getReader(dagFilePath)) {
+      assertEventsRead(reader, protos, 1, protos.size());
+
+      int totalBytesRead = getTotalBytesRead(reader);
+      // cin.resetSizeCounter() in ProtoMessageWritable.java ensures that
+      // totalBytesRead will always be 0. For reference read javadoc of 
CodedInputStream.
+      Assert.assertEquals(totalBytesRead, 0);
+    }
+  }
+
+  private static int getTotalBytesRead(ProtoMessageReader<HistoryEventProto> 
reader) throws NoSuchFieldException,
+      IllegalAccessException {
+    // writable is a private field in ProtoMessageReader.java
+    Field f = reader.getClass().getDeclaredField("writable");
+    f.setAccessible(true);
+    ProtoMessageWritable<HistoryEventProto> writable = 
(ProtoMessageWritable<HistoryEventProto>) f.get(reader);
+
+    // cin is a private filed in ProtoMessageWritable.java
+    Field c = writable.getClass().getDeclaredField("cin");
+    c.setAccessible(true);
+    CodedInputStream cin = (CodedInputStream) c.get(writable);
+
+    // Goal is to get value of: reader.writable.cin.getTotalBytesRead()
+    return cin.getTotalBytesRead();
+  }
+
   @Test
   public void testServiceSplitEvents() throws Exception {
     ProtoHistoryLoggingService service = createService(true);

Reply via email to