[
https://issues.apache.org/jira/browse/BEAM-14304?focusedWorklogId=761099&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761099
]
ASF GitHub Bot logged work on BEAM-14304:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Apr/22 21:12
Start Date: 22/Apr/22 21:12
Worklog Time Spent: 10m
Work Description: lostluck commented on code in PR #17347:
URL: https://github.com/apache/beam/pull/17347#discussion_r853292786
##########
sdks/go/pkg/beam/io/parquetio/parquetio.go:
##########
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// This module contains all Go code used for Beam's SDKs. This file is placed
Review Comment:
Please add a line break between the license and the package doc.
##########
sdks/go/pkg/beam/io/parquetio/parquetio.go:
##########
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// This module contains all Go code used for Beam's SDKs. This file is placed
+// in this directory in order to cover the go code required for Java and Python
+// containers, as well as the entire Go SDK. Placing this file in the
repository
+// root is not possible because it causes conflicts with a pre-existing vendor
+// directory.
+package parquetio
+
+import (
+ "context"
+ "io/ioutil"
+ "reflect"
+ "strings"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+ "github.com/xitongsys/parquet-go-source/buffer"
+ "github.com/xitongsys/parquet-go/reader"
+ "github.com/xitongsys/parquet-go/writer"
+)
+
+func init() {
+ beam.RegisterFunction(expandFn)
+ beam.RegisterType(reflect.TypeOf((*parquetReadFn)(nil)).Elem())
+ beam.RegisterType(reflect.TypeOf((*parquetWriteFn)(nil)).Elem())
+}
+
+// Read reads a set of files and returns lines as a PCollection<elem>
+// based on the internal avro schema of the file.
+// A type - reflect.TypeOf( YourType{} ) - with
+// JSON tags can be defined or if you wish to return the raw JSON string,
+// use - reflect.TypeOf("") -
+func Read(s beam.Scope, glob string, t reflect.Type) beam.PCollection {
+ s = s.Scope("parquetio.Read")
+ filesystem.ValidateScheme(glob)
+ return read(s, t, beam.Create(s, glob))
+}
+
+func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection
{
+ files := beam.ParDo(s, expandFn, col)
+ return beam.ParDo(s,
+ &parquetReadFn{Type: beam.EncodedType{T: t}},
+ files,
+ beam.TypeDefinition{Var: beam.XType, T: t},
+ )
+}
+
+func expandFn(ctx context.Context, glob string, emit func(string)) error {
+ if strings.TrimSpace(glob) == "" {
+ return nil // ignore empty string elements here
+ }
+
+ fs, err := filesystem.New(ctx, glob)
+ if err != nil {
+ return err
+ }
+ defer fs.Close()
+
+ files, err := fs.List(ctx, glob)
+ if err != nil {
+ return err
+ }
+ for _, filename := range files {
+ emit(filename)
+ }
+ return nil
+}
+
+type parquetReadFn struct {
+ Type beam.EncodedType
+}
+
+func (a *parquetReadFn) ProcessElement(ctx context.Context, filename string,
emit func(beam.X)) error {
+ fs, err := filesystem.New(ctx, filename)
+ if err != nil {
+ return err
+ }
+ defer fs.Close()
+
+ fd, err := fs.OpenRead(ctx, filename)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ data, err := ioutil.ReadAll(fd)
+ if err != nil {
+ return err
+ }
+
+ bufferReader := buffer.NewBufferFileFromBytes(data)
+ parquetReader, err := reader.NewParquetReader(bufferReader,
reflect.New(a.Type.T).Interface(), 4)
+ if err != nil {
+ return err
+ }
+
+ vals, err := parquetReader.ReadByNumber(int(parquetReader.GetNumRows()))
+ if err != nil {
+ return err
+ }
+ for _, v := range vals {
+ emit(v)
+ }
+
+ return nil
+}
+
+func Write(s beam.Scope, filename string, t reflect.Type, col
beam.PCollection) {
Review Comment:
Please add a doc comment here, making it explicit this will only write a
single file.
##########
sdks/go/pkg/beam/io/parquetio/parquetio_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// This module contains all Go code used for Beam's SDKs. This file is placed
+// in this directory in order to cover the go code required for Java and Python
+// containers, as well as the entire Go SDK. Placing this file in the
repository
+// root is not possible because it causes conflicts with a pre-existing vendor
+// directory.
+package parquetio
Review Comment:
The test files don't need to have a package doc, and the license should be
disconnected from the `package` line, so they don't get interpreted as one.
```suggestion
// limitations under the License.
package parquetio
```
Issue Time Tracking
-------------------
Worklog Id: (was: 761099)
Time Spent: 50m (was: 40m)
> Implement parquetio for Go SDK
> ------------------------------
>
> Key: BEAM-14304
> URL: https://issues.apache.org/jira/browse/BEAM-14304
> Project: Beam
> Issue Type: New Feature
> Components: sdk-go
> Reporter: Nguyen Khoi Nguyen
> Priority: P2
> Time Spent: 50m
> Remaining Estimate: 0h
>
> The naive approach would be reading the whole parquet file into memory,
> because processing parquet files requires io.Seeker
> Or implement filesystem.go Interface to return io.ReadSeekCloser, but it
> would not be trivial for gcs
--
This message was sent by Atlassian Jira
(v8.20.7#820007)