[ 
https://issues.apache.org/jira/browse/BEAM-14304?focusedWorklogId=761099&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761099
 ]

ASF GitHub Bot logged work on BEAM-14304:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Apr/22 21:12
            Start Date: 22/Apr/22 21:12
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17347:
URL: https://github.com/apache/beam/pull/17347#discussion_r853292786


##########
sdks/go/pkg/beam/io/parquetio/parquetio.go:
##########
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// This module contains all Go code used for Beam's SDKs. This file is placed

Review Comment:
   Please add a line break between the license and the package doc. 



##########
sdks/go/pkg/beam/io/parquetio/parquetio.go:
##########
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// This module contains all Go code used for Beam's SDKs. This file is placed
+// in this directory in order to cover the go code required for Java and Python
+// containers, as well as the entire Go SDK. Placing this file in the 
repository
+// root is not possible because it causes conflicts with a pre-existing vendor
+// directory.
+package parquetio
+
+import (
+       "context"
+       "io/ioutil"
+       "reflect"
+       "strings"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       "github.com/xitongsys/parquet-go-source/buffer"
+       "github.com/xitongsys/parquet-go/reader"
+       "github.com/xitongsys/parquet-go/writer"
+)
+
+func init() {
+       beam.RegisterFunction(expandFn)
+       beam.RegisterType(reflect.TypeOf((*parquetReadFn)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*parquetWriteFn)(nil)).Elem())
+}
+
+// Read reads a set of files and returns lines as a PCollection<elem>
+// based on the internal avro schema of the file.
+// A type - reflect.TypeOf( YourType{} ) -  with
+// JSON tags can be defined or if you wish to return the raw JSON string,
+// use - reflect.TypeOf("") -
+func Read(s beam.Scope, glob string, t reflect.Type) beam.PCollection {
+       s = s.Scope("parquetio.Read")
+       filesystem.ValidateScheme(glob)
+       return read(s, t, beam.Create(s, glob))
+}
+
+func read(s beam.Scope, t reflect.Type, col beam.PCollection) beam.PCollection 
{
+       files := beam.ParDo(s, expandFn, col)
+       return beam.ParDo(s,
+               &parquetReadFn{Type: beam.EncodedType{T: t}},
+               files,
+               beam.TypeDefinition{Var: beam.XType, T: t},
+       )
+}
+
+func expandFn(ctx context.Context, glob string, emit func(string)) error {
+       if strings.TrimSpace(glob) == "" {
+               return nil // ignore empty string elements here
+       }
+
+       fs, err := filesystem.New(ctx, glob)
+       if err != nil {
+               return err
+       }
+       defer fs.Close()
+
+       files, err := fs.List(ctx, glob)
+       if err != nil {
+               return err
+       }
+       for _, filename := range files {
+               emit(filename)
+       }
+       return nil
+}
+
+type parquetReadFn struct {
+       Type beam.EncodedType
+}
+
+func (a *parquetReadFn) ProcessElement(ctx context.Context, filename string, 
emit func(beam.X)) error {
+       fs, err := filesystem.New(ctx, filename)
+       if err != nil {
+               return err
+       }
+       defer fs.Close()
+
+       fd, err := fs.OpenRead(ctx, filename)
+       if err != nil {
+               return err
+       }
+       defer fd.Close()
+
+       data, err := ioutil.ReadAll(fd)
+       if err != nil {
+               return err
+       }
+
+       bufferReader := buffer.NewBufferFileFromBytes(data)
+       parquetReader, err := reader.NewParquetReader(bufferReader, 
reflect.New(a.Type.T).Interface(), 4)
+       if err != nil {
+               return err
+       }
+
+       vals, err := parquetReader.ReadByNumber(int(parquetReader.GetNumRows()))
+       if err != nil {
+               return err
+       }
+       for _, v := range vals {
+               emit(v)
+       }
+
+       return nil
+}
+
+func Write(s beam.Scope, filename string, t reflect.Type, col 
beam.PCollection) {

Review Comment:
   Please add a doc comment here, making it explicit this will only write a 
single file.



##########
sdks/go/pkg/beam/io/parquetio/parquetio_test.go:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// This module contains all Go code used for Beam's SDKs. This file is placed
+// in this directory in order to cover the go code required for Java and Python
+// containers, as well as the entire Go SDK. Placing this file in the 
repository
+// root is not possible because it causes conflicts with a pre-existing vendor
+// directory.
+package parquetio

Review Comment:
   The test files don't need to have a package doc, and the license should be 
disconnected from the `package` line, so they don't get interpreted as one.
   
   ```suggestion
   // limitations under the License.
   
   package parquetio
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 761099)
    Time Spent: 50m  (was: 40m)

> Implement parquetio for Go SDK
> ------------------------------
>
>                 Key: BEAM-14304
>                 URL: https://issues.apache.org/jira/browse/BEAM-14304
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: Nguyen Khoi Nguyen
>            Priority: P2
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> The naive approach would be reading the whole parquet file into memory, 
> because processing parquet files requires io.Seeker
> Or implement filesystem.go Interface to return io.ReadSeekCloser, but it 
> would not be trivial for gcs



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to