[ 
https://issues.apache.org/jira/browse/BEAM-4359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sivanand updated BEAM-4359:
---------------------------
    Description: 
The bug is here:

[https://github.com/apache/beam/blob/3ba96003d31ce98a54c0c51c1c0a9cf7c06e2fa2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java#L231-L235]
{code:java}
case STRING: {
        String str = value.getString();
        VarInt.encode(str.length(), bos);
        bos.write(str.getBytes(StandardCharsets.UTF_8));
        break;
}
{code}
 

The code assumes that the number of bytes used to represent a UTF-8 String 
equals the string length. This is not true because a UTF-8 character can be 
encoded using 1 - 4 bytes.

>From wikipedia: [https://en.wikipedia.org/wiki/UTF-8]
{quote}UTF-8 is a variable width character encoding capable of encoding all 
1,112,064 valid code points in Unicode using one to four 8-bit bytes
{quote}
Code to recreate the issue:
{code:java}
/*
Schema in spanner
CREATE TABLE test (
  id INT64,
  testString STRING(MAX),
  number INT64,
) PRIMARY KEY (id)
*/

    import com.google.cloud.spanner.Mutation;
    import com.google.common.collect.Lists;
    import org.apache.beam.runners.direct.DirectRunner;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.testing.TestPipeline;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.junit.Rule;
    import org.junit.Test;
    
    import java.io.Serializable;
    import java.util.List;
    
    public class BeamSpannerTest implements Serializable {
    
        @Rule
        public transient TestPipeline pipeline = TestPipeline.create();
    
        @Test
        public void testSpanner() {
            pipeline.getOptions().setRunner(DirectRunner.class);
    
            List<String> strdata = Lists.newArrayList("၃7");
    
    
            pipeline.apply(
                Create.of(strdata)
            ).apply(ParDo.of(new DoFn<String, Mutation>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String value = c.element();
                    c.output(Mutation.newInsertOrUpdateBuilder("test")
                        .set("id").to(1)
                        .set("testString").to(value)
                        .set("number").to(10)
                        .build());
                }
            })
           ).apply("Write to Spanner", SpannerIO.write()
                    .withProjectId("my-project")
                    .withInstanceId("spanner-instance")
                    .withDatabaseId("test")
            );
    
            pipeline.run();
        }
    }
{code}
After running the code, the value in the column {{number}} will be {{7043}} and 
not {{10}} because the bytes from the previous column {{testString}} have 
spilled into the {{number}}

  was:
The bug is here:

https://github.com/apache/beam/blob/3ba96003d31ce98a54c0c51c1c0a9cf7c06e2fa2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java#L231-L235
{code}
case STRING: {
        String str = value.getString();
        VarInt.encode(str.length(), bos);
        bos.write(str.getBytes(StandardCharsets.UTF_8));
        break;
}
{code} 

The code assumes that the number of bytes used to represent a UTF_8 String 
equals the string length. This is not true because a UTF_8 character can be 
encoded using 1 - 4 bytes.

>From wikipedia: https://en.wikipedia.org/wiki/UTF-8
{quote}UTF-8 is a variable width character encoding capable of encoding all 
1,112,064 valid code points in Unicode using one to four 8-bit bytes{quote}

Code to recreate the issue:
{code}
/*
Schema in spanner
CREATE TABLE test (
  id INT64,
  testString STRING(MAX),
  number INT64,
) PRIMARY KEY (id)
*/

    import com.google.cloud.spanner.Mutation;
    import com.google.common.collect.Lists;
    import org.apache.beam.runners.direct.DirectRunner;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.testing.TestPipeline;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.junit.Rule;
    import org.junit.Test;
    
    import java.io.Serializable;
    import java.util.List;
    
    public class BeamSpannerTest implements Serializable {
    
        @Rule
        public transient TestPipeline pipeline = TestPipeline.create();
    
        @Test
        public void testSpanner() {
            pipeline.getOptions().setRunner(DirectRunner.class);
    
            List<String> strdata = Lists.newArrayList("၃7");
    
    
            pipeline.apply(
                Create.of(strdata)
            ).apply(ParDo.of(new DoFn<String, Mutation>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String value = c.element();
                    c.output(Mutation.newInsertOrUpdateBuilder("test")
                        .set("id").to(1)
                        .set("testString").to(value)
                        .set("number").to(10)
                        .build());
                }
            })
           ).apply("Write to Spanner", SpannerIO.write()
                    .withProjectId("my-project")
                    .withInstanceId("spanner-instance")
                    .withDatabaseId("test")
            );
    
            pipeline.run();
        }
    }
{code}

After running the code, the value in the column {{number}} will be {{7043}} and 
not {{10}} because the bytes from the previous column {{testString}} have 
spilled into the {{number}}



> String encoding for a spanner mutation assumes that string length equals 
> bytes length
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-4359
>                 URL: https://issues.apache.org/jira/browse/BEAM-4359
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.4.0
>            Reporter: Sivanand
>            Assignee: Chamikara Jayalath
>            Priority: Major
>
> The bug is here:
> [https://github.com/apache/beam/blob/3ba96003d31ce98a54c0c51c1c0a9cf7c06e2fa2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java#L231-L235]
> {code:java}
> case STRING: {
>         String str = value.getString();
>         VarInt.encode(str.length(), bos);
>         bos.write(str.getBytes(StandardCharsets.UTF_8));
>         break;
> }
> {code}
>  
> The code assumes that the number of bytes used to represent a UTF-8 String 
> equals the string length. This is not true because a UTF-8 character can be 
> encoded using 1 - 4 bytes.
> From wikipedia: [https://en.wikipedia.org/wiki/UTF-8]
> {quote}UTF-8 is a variable width character encoding capable of encoding all 
> 1,112,064 valid code points in Unicode using one to four 8-bit bytes
> {quote}
> Code to recreate the issue:
> {code:java}
> /*
> Schema in spanner
> CREATE TABLE test (
>   id INT64,
>   testString STRING(MAX),
>   number INT64,
> ) PRIMARY KEY (id)
> */
>     import com.google.cloud.spanner.Mutation;
>     import com.google.common.collect.Lists;
>     import org.apache.beam.runners.direct.DirectRunner;
>     import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
>     import org.apache.beam.sdk.testing.TestPipeline;
>     import org.apache.beam.sdk.transforms.Create;
>     import org.apache.beam.sdk.transforms.DoFn;
>     import org.apache.beam.sdk.transforms.ParDo;
>     import org.junit.Rule;
>     import org.junit.Test;
>     
>     import java.io.Serializable;
>     import java.util.List;
>     
>     public class BeamSpannerTest implements Serializable {
>     
>         @Rule
>         public transient TestPipeline pipeline = TestPipeline.create();
>     
>         @Test
>         public void testSpanner() {
>             pipeline.getOptions().setRunner(DirectRunner.class);
>     
>             List<String> strdata = Lists.newArrayList("၃7");
>     
>     
>             pipeline.apply(
>                 Create.of(strdata)
>             ).apply(ParDo.of(new DoFn<String, Mutation>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     String value = c.element();
>                     c.output(Mutation.newInsertOrUpdateBuilder("test")
>                         .set("id").to(1)
>                         .set("testString").to(value)
>                         .set("number").to(10)
>                         .build());
>                 }
>             })
>            ).apply("Write to Spanner", SpannerIO.write()
>                     .withProjectId("my-project")
>                     .withInstanceId("spanner-instance")
>                     .withDatabaseId("test")
>             );
>     
>             pipeline.run();
>         }
>     }
> {code}
> After running the code, the value in the column {{number}} will be {{7043}} 
> and not {{10}} because the bytes from the previous column {{testString}} have 
> spilled into the {{number}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to