shirshanka commented on a change in pull request #2860: [GOBBLIN-1015] Support
for direct Avro and Protobuf formats through Parquet writer
URL: https://github.com/apache/incubator-gobblin/pull/2860#discussion_r367061425
##########
File path:
gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
##########
@@ -169,28 +191,72 @@ public TestBatchExtractor(int partition,
this.numRecordsPerExtract = numRecordsPerExtract;
this.sleepTimePerRecord = sleepTimePerRecord;
this.workUnitState = wuState;
+ this.inMemoryFormat =
InMemoryFormat.valueOf(this.workUnitState.getProp(MEMORY_FORMAT_KEY));
+ this.schema = getSchema(inMemoryFormat);
}
@Override
- public String getSchema()
+ public Object getSchema()
throws IOException {
- return "";
+ return this.schema;
+ }
+
+ private Object getSchema(InMemoryFormat inMemoryFormat) {
+ switch (inMemoryFormat) {
+ case POJO: {
+ return TestRecord.class;
+ }
+ case AVRO: {
+ return org.apache.gobblin.test.avro.TestRecord.getClassSchema();
+ }
+ case PROTOBUF: {
+ return TestRecordProtos.TestRecord.class;
+ }
+ default:
+ throw new RuntimeException("Not implemented " +
inMemoryFormat.name());
+ }
}
@Override
- public Object readRecord(@Deprecated Object reuse)
+ public RecordEnvelope readRecordEnvelope()
throws DataRecordException, IOException {
if (recordsExtracted < numRecordsPerExtract) {
try {
Thread.sleep(sleepTimePerRecord);
} catch (InterruptedException e) {
Throwables.propagate(e);
}
- TestRecord record = new TestRecord(this.partition,
this.currentWatermark.getValue(), null);
+ Object record;
+ switch (this.inMemoryFormat) {
+ case POJO: {
+ record = new TestRecord(this.partition,
this.currentWatermark.getValue(), "I am a POJO message");
+ break;
+ }
+ case AVRO: {
+ record = org.apache.gobblin.test.avro.TestRecord.newBuilder()
+ .setPartition(this.partition)
+ .setSequence(this.currentWatermark.getValue())
+ .setPayload("I am an Avro message")
+ .build();
+ break;
+ }
+ case PROTOBUF: {
+ record = TestRecordProtos.TestRecord.newBuilder()
+ .setPartition(this.partition)
+ .setSequence(this.currentWatermark.getValue())
+ .setPayload("I am a Protobuf message")
+ .build();
+ break;
+ }
+ default: throw new RuntimeException("");
+ }
log.debug("Extracted record -> {}", record);
+ RecordEnvelope re = new RecordEnvelope<>((Object) record,
Review comment:
fixed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services