[
https://issues.apache.org/jira/browse/BEAM-6480?focusedWorklogId=272621&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-272621
]
ASF GitHub Bot logged work on BEAM-6480:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Jul/19 14:30
Start Date: 05/Jul/19 14:30
Worklog Time Spent: 10m
Work Description: iemejia commented on pull request #9005: [BEAM-6480]
Adds AvroIO sink for generic records.
URL: https://github.com/apache/beam/pull/9005#discussion_r300704405
##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
##########
@@ -1117,6 +1122,68 @@ private void
testDynamicDestinationsUnwindowedWithSharding(
}
case AVROIO_SINK:
+ {
+ FileIO.Write<String, IndexedRecord> write =
+ FileIO.<String, IndexedRecord>writeDynamic()
+ .by(
+ fn(
+ (element, c) -> {
+ c.sideInput(schemaView); // Ignore result
+ return
element.getSchema().getName().substring(0, 1);
+ },
+ requiresSideInputs(schemaView)))
+ .via(
+ fn(
+ (dest, c) -> {
+ Schema schema =
+ new
Schema.Parser().parse(c.sideInput(schemaView).get(dest));
+ return AvroIO.sinkViaGeneric(schema);
+ },
+ requiresSideInputs(schemaView)))
+ .to(baseDir.toString())
+ .withNaming(
+ fn(
+ (dest, c) -> {
+ c.sideInput(schemaView); // Ignore result
+ return FileIO.Write.defaultNaming("file_" +
dest, ".avro");
+ },
+ requiresSideInputs(schemaView)))
+ .withTempDirectory(baseDir.toString())
+ .withDestinationCoder(StringUtf8Coder.of())
+ .withIgnoreWindowing();
+ switch (sharding) {
+ case RUNNER_DETERMINED:
+ break;
+ case WITHOUT_SHARDING:
+ write = write.withNumShards(1);
+ break;
+ case FIXED_3_SHARDS:
+ write = write.withNumShards(3);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown sharding " +
sharding);
+ }
+
+ MapElements<String, IndexedRecord> formatter =
Review comment:
`toIndexedRecord` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 272621)
Time Spent: 1h (was: 50m)
> Add AvroIO.sink for IndexedRecord (FileIO compatible)
> -----------------------------------------------------
>
> Key: BEAM-6480
> URL: https://issues.apache.org/jira/browse/BEAM-6480
> Project: Beam
> Issue Type: New Feature
> Components: io-java-avro
> Affects Versions: 2.9.0
> Reporter: Romain Manni-Bucau
> Assignee: Ryan Skraba
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> More generally for sink there is no need to create a mapper API since the
> previous PTransform can always map in a format the sink support so any sink
> can assume the format is right.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)