Repository: tez Updated Branches: refs/heads/master 13dca15da -> 88bd5b9dc
TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/88bd5b9d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/88bd5b9d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/88bd5b9d Branch: refs/heads/master Commit: 88bd5b9dc3f08b3c3b361e5167c219ad3e59c53f Parents: 13dca15 Author: Jonathan Eagles <[email protected]> Authored: Wed May 11 16:32:01 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed May 11 16:32:01 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 8 + .../tez/runtime/api/impl/EventMetaData.java | 48 ++++++ .../apache/tez/runtime/api/impl/TezEvent.java | 117 +++++++++----- .../tez/runtime/api/impl/TestTezEvent.java | 152 +++++++++++++++++++ 4 files changed, 283 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dc9f4fe..62833dd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization TEZ-3251. Allow ability to add custom counters to TaskRunner2Callable. TEZ-3250. TezTaskRunner2 should accept ExecutorService. TEZ-3245. Data race between addKnowInput and clearAndGetOnepartition of InputHost. @@ -34,6 +35,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization TEZ-3251. Allow ability to add custom counters to TaskRunner2Callable. TEZ-3250. TezTaskRunner2 should accept ExecutorService. TEZ-3193. Deadlock in AM during task commit request. @@ -467,6 +469,12 @@ TEZ-2003: Support for External services CHANGES TEZ-2735. rebase 08/21 TEZ-2736. Pre-merge: Update CHANGES.txt and version in branch. +Release 0.7.2: Unreleased + +INCOMPATIBLE CHANGES + +ALL CHANGES: + TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization Release 0.7.1: Unreleased http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java index 88cad47..0ee96af 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java @@ -19,11 +19,13 @@ package org.apache.tez.runtime.api.impl; import static com.google.common.base.Preconditions.checkNotNull; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import javax.annotation.Nullable; + import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringInterner; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -137,4 +139,50 @@ public class EventMetaData implements Writable { + ", taskAttemptId=" + (taskAttemptID == null? "null" : taskAttemptID) + " }"; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((edgeVertexName == null) ? 0 : edgeVertexName.hashCode()); + result = prime + * result + + ((producerConsumerType == null) ? 0 : producerConsumerType.hashCode()); + result = prime * result + + ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode()); + result = prime * result + + ((taskVertexName == null) ? 0 : taskVertexName.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + EventMetaData other = (EventMetaData) obj; + if (edgeVertexName == null) { + if (other.edgeVertexName != null) + return false; + } else if (!edgeVertexName.equals(other.edgeVertexName)) + return false; + if (producerConsumerType != other.producerConsumerType) + return false; + if (taskAttemptID == null) { + if (other.taskAttemptID != null) + return false; + } else if (!taskAttemptID.equals(other.taskAttemptID)) + return false; + if (taskVertexName == null) { + if (other.taskVertexName != null) + return false; + } else if (!taskVertexName.equals(other.taskVertexName)) + return false; + return true; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index b3ce8c4..e76bdbb 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -21,7 +21,9 @@ package org.apache.tez.runtime.api.impl; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.OutputStream; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Writable; import org.apache.tez.common.ProtoConverters; import org.apache.tez.common.TezConverterUtils; @@ -36,10 +38,10 @@ import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto; import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto; import org.apache.tez.runtime.api.events.EventProtos.RootInputDataInformationEventProto; import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto; -import org.apache.tez.runtime.api.events.InputFailedEvent; -import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent; @@ -49,6 +51,10 @@ import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttempt import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto; import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptKilledEventProto; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; + public class TezEvent implements Writable { private EventType eventType; @@ -58,7 +64,7 @@ public class TezEvent implements Writable { private EventMetaData sourceInfo; private EventMetaData destinationInfo; - + private long eventReceivedTime; public TezEvent() { @@ -67,7 +73,7 @@ public class TezEvent implements Writable { public TezEvent(Event event, EventMetaData sourceInfo) { this(event, sourceInfo, System.currentTimeMillis()); } - + public TezEvent(Event event, EventMetaData sourceInfo, long time) { this.event = event; this.eventReceivedTime = time; @@ -103,11 +109,11 @@ public class TezEvent implements Writable { public Event getEvent() { return event; } - + public void setEventReceivedTime(long eventReceivedTime) { // TODO save this.eventReceivedTime = eventReceivedTime; } - + public long getEventReceivedTime() { return eventReceivedTime; } @@ -145,67 +151,78 @@ public class TezEvent implements Writable { TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event; sEvt.write(out); } else { - byte[] eventBytes = null; + AbstractMessage message; switch (eventType) { case DATA_MOVEMENT_EVENT: - eventBytes = + message = ProtoConverters.convertDataMovementEventToProto( - (DataMovementEvent) event).toByteArray(); + (DataMovementEvent) event); break; case COMPOSITE_DATA_MOVEMENT_EVENT: - eventBytes = + message = ProtoConverters.convertCompositeDataMovementEventToProto( - (CompositeDataMovementEvent) event).toByteArray(); + (CompositeDataMovementEvent) event); break; case VERTEX_MANAGER_EVENT: - eventBytes = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event) - .toByteArray(); + message = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event); break; case INPUT_READ_ERROR_EVENT: InputReadErrorEvent ideEvt = (InputReadErrorEvent) event; - eventBytes = InputReadErrorEventProto.newBuilder() + message = InputReadErrorEventProto.newBuilder() .setIndex(ideEvt.getIndex()) .setDiagnostics(ideEvt.getDiagnostics()) .setVersion(ideEvt.getVersion()) - .build().toByteArray(); + .build(); break; case TASK_ATTEMPT_FAILED_EVENT: TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event; - eventBytes = TaskAttemptFailedEventProto.newBuilder() + message = TaskAttemptFailedEventProto.newBuilder() .setDiagnostics(tfEvt.getDiagnostics()) .setTaskFailureType(TezConverterUtils.failureTypeToProto(tfEvt.getTaskFailureType())) - .build().toByteArray(); + .build(); break; case TASK_ATTEMPT_KILLED_EVENT: TaskAttemptKilledEvent tkEvent = (TaskAttemptKilledEvent) event; - eventBytes = TaskAttemptKilledEventProto.newBuilder() - .setDiagnostics(tkEvent.getDiagnostics()).build().toByteArray(); + message = TaskAttemptKilledEventProto.newBuilder() + .setDiagnostics(tkEvent.getDiagnostics()).build(); break; case TASK_ATTEMPT_COMPLETED_EVENT: - eventBytes = TaskAttemptCompletedEventProto.newBuilder() - .build().toByteArray(); + message = TaskAttemptCompletedEventProto.newBuilder() + .build(); break; case INPUT_FAILED_EVENT: InputFailedEvent ifEvt = (InputFailedEvent) event; - eventBytes = InputFailedEventProto.newBuilder() + message = InputFailedEventProto.newBuilder() .setTargetIndex(ifEvt.getTargetIndex()) - .setVersion(ifEvt.getVersion()).build().toByteArray(); + .setVersion(ifEvt.getVersion()).build(); break; case ROOT_INPUT_DATA_INFORMATION_EVENT: - eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto( - (InputDataInformationEvent) event).toByteArray(); + message = ProtoConverters.convertRootInputDataInformationEventToProto( + (InputDataInformationEvent) event); break; case ROOT_INPUT_INITIALIZER_EVENT: - eventBytes = ProtoConverters - .convertRootInputInitializerEventToProto((InputInitializerEvent) event) - .toByteArray(); + message = ProtoConverters + .convertRootInputInitializerEventToProto((InputInitializerEvent) event); break; default: throw new TezUncheckedException("Unknown TezEvent" + ", type=" + eventType); } - out.writeInt(eventBytes.length); - out.write(eventBytes); + if (out instanceof OutputStream) { //DataOutputBuffer extends DataOutputStream + int serializedSize = message.getSerializedSize(); + out.writeInt(serializedSize); + int buffersize = serializedSize < CodedOutputStream.DEFAULT_BUFFER_SIZE ? serializedSize + : CodedOutputStream.DEFAULT_BUFFER_SIZE; + CodedOutputStream codedOut = CodedOutputStream.newInstance( + (OutputStream) out, buffersize); + message.writeTo(codedOut); + codedOut.flush(); + } else { + byte[] eventBytes = message.toByteArray(); + out.writeInt(eventBytes.length); + out.write(eventBytes); + } + } } @@ -222,36 +239,45 @@ public class TezEvent implements Writable { ((TaskStatusUpdateEvent)event).readFields(in); } else { int eventBytesLen = in.readInt(); - byte[] eventBytes = new byte[eventBytesLen]; - in.readFully(eventBytes); + byte[] eventBytes; + CodedInputStream input; + int startOffset = 0; + if (in instanceof DataInputBuffer) { + eventBytes = ((DataInputBuffer)in).getData(); + startOffset = ((DataInputBuffer) in).getPosition(); + } else { + eventBytes = new byte[eventBytesLen]; + in.readFully(eventBytes); + } + input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen); switch (eventType) { case DATA_MOVEMENT_EVENT: DataMovementEventProto dmProto = - DataMovementEventProto.parseFrom(eventBytes); + DataMovementEventProto.parseFrom(input); event = ProtoConverters.convertDataMovementEventFromProto(dmProto); break; case COMPOSITE_DATA_MOVEMENT_EVENT: - CompositeEventProto cProto = CompositeEventProto.parseFrom(eventBytes); + CompositeEventProto cProto = CompositeEventProto.parseFrom(input); event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto); break; case VERTEX_MANAGER_EVENT: - VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(eventBytes); + VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(input); event = ProtoConverters.convertVertexManagerEventFromProto(vmProto); break; case INPUT_READ_ERROR_EVENT: InputReadErrorEventProto ideProto = - InputReadErrorEventProto.parseFrom(eventBytes); + InputReadErrorEventProto.parseFrom(input); event = InputReadErrorEvent.create(ideProto.getDiagnostics(), ideProto.getIndex(), ideProto.getVersion()); break; case TASK_ATTEMPT_FAILED_EVENT: TaskAttemptFailedEventProto tfProto = - TaskAttemptFailedEventProto.parseFrom(eventBytes); + TaskAttemptFailedEventProto.parseFrom(input); event = new TaskAttemptFailedEvent(tfProto.getDiagnostics(), TezConverterUtils.failureTypeFromProto(tfProto.getTaskFailureType())); break; case TASK_ATTEMPT_KILLED_EVENT: - TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(eventBytes); + TaskAttemptKilledEventProto tkProto = TaskAttemptKilledEventProto.parseFrom(input); event = new TaskAttemptKilledEvent(tkProto.getDiagnostics()); break; case TASK_ATTEMPT_COMPLETED_EVENT: @@ -259,16 +285,16 @@ public class TezEvent implements Writable { break; case INPUT_FAILED_EVENT: InputFailedEventProto ifProto = - InputFailedEventProto.parseFrom(eventBytes); + InputFailedEventProto.parseFrom(input); event = InputFailedEvent.create(ifProto.getTargetIndex(), ifProto.getVersion()); break; case ROOT_INPUT_DATA_INFORMATION_EVENT: RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto - .parseFrom(eventBytes); + .parseFrom(input); event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto); break; case ROOT_INPUT_INITIALIZER_EVENT: - EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(eventBytes); + EventProtos.RootInputInitializerEventProto riiProto = EventProtos.RootInputInitializerEventProto.parseFrom(input); event = ProtoConverters.convertRootInputInitializerEventFromProto(riiProto); break; default: @@ -276,6 +302,13 @@ public class TezEvent implements Writable { throw new TezUncheckedException("Unexpected TezEvent" + ", type=" + eventType); } + if (in instanceof DataInputBuffer) { + // Skip so that position is updated + int skipped = in.skipBytes(eventBytesLen); + if (skipped != eventBytesLen) { + throw new TezUncheckedException("Expected to skip " + eventBytesLen + " bytes. Actually skipped = " + skipped); + } + } } } @@ -294,7 +327,7 @@ public class TezEvent implements Writable { } else { out.writeBoolean(false); } - } + } @Override public void readFields(DataInput in) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/88bd5b9d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java new file mode 100644 index 0000000..b39c4ed --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTezEvent.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.api.impl; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; +import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + +public class TestTezEvent { + + @Test + public void testSerialization() throws IOException { + + ArrayList<TezEvent> events = new ArrayList<TezEvent>(); + + Configuration conf = new Configuration(true); + String confVal = RandomStringUtils.random(10000, true, true); + conf.set("testKey", confVal); + UserPayload payload = TezUtils.createUserPayloadFromConf(conf); + TezTaskAttemptID srcTAID = TezTaskAttemptID.getInstance( + TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 1000); + TezTaskAttemptID destTAID = TezTaskAttemptID.getInstance( + TezTaskID.fromString("task_1454468251169_866787_1_02_000000"), 2000); + EventMetaData srcInfo = new EventMetaData(EventProducerConsumerType.OUTPUT, + "v1", "v2", srcTAID); + EventMetaData destInfo = new EventMetaData(EventProducerConsumerType.OUTPUT, + "v3", "v4", destTAID); + + // Case of size less than 4K and parsing skipped during deserialization + events.add(new TezEvent(new TaskAttemptCompletedEvent(), new EventMetaData( + EventProducerConsumerType.PROCESSOR, "v1", "v2", srcTAID))); + TezEvent dmeEvent = new TezEvent(DataMovementEvent.create(1000, 3, 1, + payload.getPayload()), srcInfo, System.currentTimeMillis()); + dmeEvent.setDestinationInfo(destInfo); + events.add(dmeEvent); + // Different code path + events.add(new TezEvent(new TaskStatusUpdateEvent(null, 0.1f, null, false), + new EventMetaData(EventProducerConsumerType.PROCESSOR, "v5", "v6", + srcTAID))); + + // Serialize to different types of DataOutput + // One that implements OutputStream and one that does not + DataOutputBuffer dataout = new DataOutputBuffer(); + ByteArrayDataOutput bout = ByteStreams.newDataOutput(); + serializeEvents(events, dataout); + serializeEvents(events, bout); + + // Deserialize from different types of DataInput + // One with DataInputBuffer and another different implementation + DataInputBuffer datain = new DataInputBuffer(); + datain.reset(dataout.getData(), dataout.getLength()); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataout.getData(), 0, dataout.getLength())); + ArrayList<TezEvent> actual1 = deserializeEvents(datain); + ArrayList<TezEvent> actual2 = deserializeEvents(dis); + assertEventEquals(events, actual1); + assertEventEquals(events, actual2); + + byte[] serializedBytes = bout.toByteArray(); + datain.reset(serializedBytes, serializedBytes.length); + dis = new DataInputStream(new ByteArrayInputStream(serializedBytes)); + actual1 = deserializeEvents(datain); + actual2 = deserializeEvents(dis); + assertEventEquals(events, actual1); + assertEventEquals(events, actual2); + + } + + private void serializeEvents(ArrayList<TezEvent> events, DataOutput out) throws IOException { + out.writeInt(events.size()); + for (TezEvent e : events) { + e.write(out); + } + } + + private ArrayList<TezEvent> deserializeEvents(DataInput in) throws IOException { + int eventsCount = in.readInt(); + ArrayList<TezEvent> events = new ArrayList<TezEvent>(eventsCount); + for (int i = 0; i < eventsCount; ++i) { + TezEvent e = new TezEvent(); + e.readFields(in); + events.add(e); + } + return events; + } + + private void assertEventEquals(ArrayList<TezEvent> expectedList, ArrayList<TezEvent> actualList) { + Assert.assertEquals(expectedList.size(), actualList.size()); + for (int i = 0; i < expectedList.size(); i++) { + TezEvent expected = expectedList.get(i); + TezEvent actual = actualList.get(i); + Assert.assertEquals(expected.getEventReceivedTime(), actual.getEventReceivedTime()); + Assert.assertEquals(expected.getSourceInfo(), actual.getSourceInfo()); + Assert.assertEquals(expected.getDestinationInfo(), actual.getDestinationInfo()); + Assert.assertEquals(expected.getEventType(), actual.getEventType()); + // Doing this instead of implementing equals methods for events + if (i == 0) { + Assert.assertTrue(actual.getEvent() instanceof TaskAttemptCompletedEvent); + } else if (i == 1) { + DataMovementEvent dmeExpected = (DataMovementEvent) expected.getEvent(); + DataMovementEvent dmeActual = (DataMovementEvent) actual.getEvent(); + Assert.assertEquals(dmeExpected.getSourceIndex(), dmeActual.getSourceIndex()); + Assert.assertEquals(dmeExpected.getTargetIndex(), dmeActual.getTargetIndex()); + Assert.assertEquals(dmeExpected.getVersion(), dmeActual.getVersion()); + Assert.assertEquals(dmeExpected.getUserPayload(), dmeActual.getUserPayload()); + } else { + TaskStatusUpdateEvent tsuExpected = (TaskStatusUpdateEvent) expected.getEvent(); + TaskStatusUpdateEvent tsuActual = (TaskStatusUpdateEvent) actual.getEvent(); + Assert.assertEquals(tsuExpected.getCounters(), tsuActual.getCounters()); + Assert.assertEquals(tsuExpected.getProgress(), tsuActual.getProgress(), 0); + Assert.assertEquals(tsuExpected.getProgressNotified(), tsuActual.getProgressNotified()); + Assert.assertEquals(tsuExpected.getStatistics(), tsuActual.getStatistics()); + } + } + } + +}
