Sivanand created BEAM-4359:
------------------------------

             Summary: String encoding for a spanner mutation assumes that 
string length equals bytes length
                 Key: BEAM-4359
                 URL: https://issues.apache.org/jira/browse/BEAM-4359
             Project: Beam
          Issue Type: Bug
          Components: io-java-gcp
    Affects Versions: 2.4.0
            Reporter: Sivanand
            Assignee: Chamikara Jayalath


The bug is here:

https://github.com/apache/beam/blob/3ba96003d31ce98a54c0c51c1c0a9cf7c06e2fa2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroupEncoder.java#L231-L235
{code}
case STRING: {
        String str = value.getString();
        VarInt.encode(str.length(), bos);
        bos.write(str.getBytes(StandardCharsets.UTF_8));
        break;
}
{code} 

The code assumes that the number of bytes used to represent a UTF_8 String 
equals the string length. This is not true because a UTF_8 character can be 
encoded using 1 - 4 bytes.

>From wikipedia: https://en.wikipedia.org/wiki/UTF-8
{quote}UTF-8 is a variable width character encoding capable of encoding all 
1,112,064 valid code points in Unicode using one to four 8-bit bytes{quote}

Code to recreate the issue:
{code}
/*
Schema in spanner
CREATE TABLE test (
  id INT64,
  testString STRING(MAX),
  number INT64,
) PRIMARY KEY (id)
*/

    import com.google.cloud.spanner.Mutation;
    import com.google.common.collect.Lists;
    import org.apache.beam.runners.direct.DirectRunner;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.testing.TestPipeline;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.junit.Rule;
    import org.junit.Test;
    
    import java.io.Serializable;
    import java.util.List;
    
    public class BeamSpannerTest implements Serializable {
    
        @Rule
        public transient TestPipeline pipeline = TestPipeline.create();
    
        @Test
        public void testSpanner() {
            pipeline.getOptions().setRunner(DirectRunner.class);
    
            List<String> strdata = Lists.newArrayList("၃7");
    
    
            pipeline.apply(
                Create.of(strdata)
            ).apply(ParDo.of(new DoFn<String, Mutation>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String value = c.element();
                    c.output(Mutation.newInsertOrUpdateBuilder("test")
                        .set("id").to(1)
                        .set("testString").to(value)
                        .set("number").to(10)
                        .build());
                }
            })
           ).apply("Write to Spanner", SpannerIO.write()
                    .withProjectId("my-project")
                    .withInstanceId("spanner-instance")
                    .withDatabaseId("test")
            );
    
            pipeline.run();
        }
    }
{code}

After running the code, the value in the column {{number}} will be {{7043}} and 
not {{10}} because the bytes from the previous column {{testString}} have 
spilled into the {{number}}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to