I don't think I'm fully understanding the input part of your pipeline since
it looks like it's using some custom IO, but I am fairly confident that
FileIO.write() will never produce empty files, so the behavior you describe
sounds expected if you're reading in an empty file.

FileIO doesn't copy files, but rather is a container for read transforms
that read file contents into a PCollection and write transforms that batch
together records from a PCollection and write serialized contents to files.

I generally would not expect the FileIO machinery to be able to preserve
file structure or naming between input and output. If that's what you want,
then you may want to read file names and write your own ParDo to run S3
copy operations.

On Sun, Mar 24, 2019 at 12:07 PM Carlos Baeza <[email protected]>
wrote:

> Hi Guys,
>
> I’m new in Apache Beam. In my current project we have follow scenario:
>
> - We run transformations via Apache Beam Pipeline into Amazon AWS (using
> Cluster by Spark, Hadoop). We can in future produce big Files.
> - The generated a AVRO file that should be stored from AWS Account A to
> AWS Account B in "S3://some_store". The process is started in Account A
> because here is the required data access layer.
> - The first experimentation shown:
>         - A empty file is created (0 bytes -> MyTransformation.avro) in
> Account
> A
>         - After process is finished, no file appear in Account B
> "S3://sone_store”. File is missing.
>
> The process defined in Account A look like:
>
> ---
> final SimplePipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).as(SimplePipelineOptions.class);
> final Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(
>          SomeDataIO.read()
>              .withZookeeperQuorum(options.getZookeeperQuorum())
>              .withDataVersion(UUID.fromString(options.getVersionId()),
> options.getDataVersion())
>              .withView(DictionaryModelView
>                  .create(MODEL, new ProcessSomeData())));
>
> Join.innerJoin(someData, someData)
>      .apply(Values.create())
>      .apply(ParDo.of(new ExtractSomeData()))
>      .apply(FileIO.<String, KV<String, String>>writeDynamic()
>          .by(KV::getKey)
>          .via(fn, AvroIO.sink(AVFeature.class))
>          .withNaming("MyTransformation.avro")
>          .to("S3://sone_store")
>          .withNumShards(1)
>          .withDestinationCoder(StringUtf8Coder.of()));
>
> pipeline.run().waitUntilFinish();
>
> ---
>
> The class ProcessSomeData is responsible to extract some data from our
> persistence layer and process it.
>
> In test, running from Account B, all work fine, we can produce the AVRO
> File (34 KB) and store the file into Account B S3 store ->
> S3://some_store
> But running in the cloud starting the process from Account A, then we
> lost the file (MyTransformation.avro, 0 bytes). -> File has not been
> copied.
> AWS S3 configuration from Account B give full access to Account A.
>
> 1. Some idea what goes wrong?
> 2. Maybe FileIO.Write.to(...) is not able to store data between AWS
> cross accounts?
> 3. Should I create my self a java client to store in Account B?
> 4. Can FileIO copy 0 bytes file?
>
> Any help is appreciate.
>
> Many thanks in advance !
>
> Carlos
>
>
>
>

Reply via email to