phet commented on a change in pull request #3436:
URL: https://github.com/apache/gobblin/pull/3436#discussion_r755489266
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobExecutionEventSubmitter.java
##########
@@ -77,6 +78,8 @@ private void submitJobStateEvent(JobState jobState) {
jobMetadataBuilder.put(METADATA_JOB_COMPLETED_TASKS,
Integer.toString(jobState.getCompletedTasks()));
jobMetadataBuilder.put(METADATA_JOB_LAUNCHER_TYPE,
jobState.getLauncherType().toString());
jobMetadataBuilder.put(METADATA_JOB_TRACKING_URL,
jobState.getTrackingURL().or(UNKNOWN_VALUE));
+
jobMetadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD,
jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, ""));
+ jobMetadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD,
jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, ""));
Review comment:
sounds reasonable
##########
File path:
gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
##########
@@ -241,6 +244,14 @@ public int hashCode() {
addLineageSourceInfo(state, sourceEntity, workunit);
partition.serialize(workunit);
workUnits.add(workunit);
+ highestWaterMark = highestWaterMark.isPresent() ?
+ Optional.of(Math.max(highestWaterMark.get(),
partition.getHighWatermark())) : Optional.of(partition.getHighWatermark());
Review comment:
`Optional.map` would be more idiomatic
##########
File path:
gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
##########
@@ -168,4 +168,37 @@ public void testRunDuration() throws DataRecordException,
IOException {
Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000))
< (1000000),
"Time spent " + timeSpentMicro);
}
+
+ @Test
+ public void testThrowException() throws DataRecordException, IOException {
+ final int MEM_ALLOC_BYTES = 100;
+ final int NUM_WORK_UNITS = 1;
+ final int SLEEP_TIME_MICRO = 1000;
+ final int NUM_RECORDS = 30; // this config is ignored since the duration
is set
+ final int RUN_DURATION_SECS = 5;
+
+ SourceState state = new SourceState();
+ state.setProp(StressTestingSource.NUM_WORK_UNITS_KEY, NUM_WORK_UNITS);
+ state.setProp(StressTestingSource.MEM_ALLOC_BYTES_KEY, MEM_ALLOC_BYTES);
+ state.setProp(StressTestingSource.SLEEP_TIME_MICRO_KEY, SLEEP_TIME_MICRO);
+ state.setProp(StressTestingSource.NUM_RECORDS_KEY, NUM_RECORDS);
+ state.setProp(StressTestingSource.RUN_DURATION_KEY, RUN_DURATION_SECS);
+ state.setProp(StressTestingSource.THROW_EXCEPTION, true);
+
+ StressTestingSource source = new StressTestingSource();
+
+ List<WorkUnit> wus = source.getWorkunits(state);
+ Assert.assertEquals(wus.size(), NUM_WORK_UNITS);
+
+ WorkUnit wu = wus.get(0);
+ WorkUnitState wuState = new WorkUnitState(wu, state);
+ Extractor<String, byte[]> extractor = source.getExtractor(wuState);
+
+ Assert.expectThrows(IOException.class, () -> {
+ byte[] record;
+ while ((record = extractor.readRecord(null)) != null) {
+ Assert.assertEquals(record.length, 100);
Review comment:
I figured with `THROW_EXCEPTION == true` it would throw in the first
`extractor.readRecord(null)` call. therefore the `Assert.fail` immediately
alerts us to begin debugging if that doesn't happen. it's also
self-documenting, so future maintainers know, "that loop body should never run".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]