[
https://issues.apache.org/jira/browse/BEAM-9951?focusedWorklogId=432912&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432912
]
ASF GitHub Bot logged work on BEAM-9951:
----------------------------------------
Author: ASF GitHub Bot
Created on: 14/May/20 00:10
Start Date: 14/May/20 00:10
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #11665:
URL: https://github.com/apache/beam/pull/11665#discussion_r424800224
##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package synthetic contains transforms for creating synthetic pipelines.
+// Synthetic pipelines are pipelines that simulate the behavior of possible
+// pipelines in order to test performance, splitting, liquid sharding, and
+// various other infrastructure used for running pipelines. This category of
+// tests are not concerned with the correctness of the elements themselves, but
+// need to simulate transforms that output many elements throughout varying
+// pipeline shapes.
+package synthetic
+
+import (
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
+ "math/rand"
+ "time"
+)
+
+// Source creates a synthetic source transform that emits randomly
+// generated KV<[]byte, []byte> elements.
+//
+// This transform accepts a PCollection of SourceConfig, where each
SourceConfig
+// determines the synthetic source's behavior for that element.
+//
+// This transform outputs a PCollection of randomly generated
+// KV<byte[], byte[]> elements.
+func Source(s beam.Scope, col beam.PCollection) beam.PCollection {
+ s = s.Scope("synthetic.Source")
+
+ return beam.ParDo(s, sourceFn{}, col)
+}
+
+// syntheticSourceFn is a splittable DoFn implementing behavior for synthetic
+// sources. For usage information, see `SyntheticSource`.
+//
+// The syntheticSourceFn is expected to receive elements of type sourceConfig
+// and follow that config to determine its behavior when splitting and emitting
+// elements.
+type sourceFn struct {
+ rng *rand.Rand
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the number of elements to emit.
+func (fn *sourceFn) CreateInitialRestriction(config SourceConfig)
offsetrange.Restriction {
+ return offsetrange.Restriction{
+ Start: 0,
+ End: int64(config.NumElements),
+ }
+}
+
+// SplitRestriction splits restrictions equally according to the number of
+// initial splits specified in SourceConfig. Each restriction output by this
+// method will contain at least one element, so the number of splits will not
+// exceed the number of elements.
+func (fn *sourceFn) SplitRestriction(config SourceConfig, rest
offsetrange.Restriction) (splits []offsetrange.Restriction) {
+ if config.InitialSplits <= 1 {
+ // Don't split, just return original restriction.
+ splits = append(splits, rest)
+ return splits
+ }
+
+ num := int64(config.InitialSplits)
Review comment:
+1 for a TODO for this PR. Thanks!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 432912)
Time Spent: 20m (was: 10m)
> Create Go SDK synthetic sources.
> --------------------------------
>
> Key: BEAM-9951
> URL: https://issues.apache.org/jira/browse/BEAM-9951
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Daniel Oliveira
> Assignee: Daniel Oliveira
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Create synthetic sources for the Go SDK like
> [Java|https://github.com/apache/beam/tree/master/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic]
> and
> [Python|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/synthetic_pipeline.py]
> have.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)