Hi,

I am facing this problem while using SortedKeyValueFiles (from avro 1.7.3 -
forking it in our code to make appends work). Below is the stack trace:

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
failed to create file /CHANGELOG/data for
DFSClient_NONMAPREDUCE_-523915610_1 on client 127.0.0.1 because current
leaseholder is trying to recreate file.

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2011)

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1853)

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2101)

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2075)

at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:435)

at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:222)

at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44070)

at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:396)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)



at org.apache.hadoop.ipc.Client.call(Client.java:1225)

at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)

at $Proxy21.append(Unknown Source)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)

at $Proxy21.append(Unknown Source)

at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:209)

at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1339)

at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1378)

at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1366)

at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:255)

at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:79)

at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1106)

at
com.abc.kepler.datasink.hdfs.util.SortedKeyValueFile$Writer.<init>(SortedKeyValueFile.java:586)

at
com.abc.kepler.datasink.hdfs.util.ChangeLogUtil.getChangeLogWriter(ChangeLogUtil.java:84)

at
com.abc.kepler.datasink.hdfs.HDFSDataSinkChangeLog.append(HDFSDataSinkChangeLog.java:219)

at
com.abc.kepler.datasink.hdfs.HDFSDataSinkChangesTest.writeDataSingleEntityKeyDefaultLocation(HDFSDataSinkChangesTest.java:1026)

at
com.abc.kepler.datasink.hdfs.HDFSDataSinkChangesTest.javadocExampleTest(HDFSDataSinkChangesTest.java:645)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)

at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)

at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)

at org.junit.rules.RunRules.evaluate(RunRules.java:20)

at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)

at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)

at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)

at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.runners.ParentRunner.run(ParentRunner.java:309)

at org.junit.runners.Suite.runChild(Suite.java:127)

at org.junit.runners.Suite.runChild(Suite.java:26)

at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)

at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)

at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)

at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)

at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)

at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)

at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)

at org.junit.runners.ParentRunner.run(ParentRunner.java:309)

at
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:234)

at
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:133)

at
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:114)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at
org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:188)

at
org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:166)

at
org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:86)

at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:101)

at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:74)

After some googling and according to a suggestion on one forum, tried
closing the filesystem and then appending but much expected that it throws
'filesystem closed' exception. Below is the code snippet where it fails (at
fileSystem.append):

// Open a writer for the data file.

            Path dataFilePath = new Path(options.getPath(), DATA_FILENAME);

            LOG.debug("Creating writer for avro data file: " +
dataFilePath);

            mRecordSchema = AvroKeyValue.getSchema(mKeySchema,
mValueSchema);

            DatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<GenericRecord>(mRecordSchema);

            OutputStream dataOutputStream;

            if (!fileSystem.exists(dataFilePath)) {

                dataOutputStream = fileSystem.create(dataFilePath);

            } else {

                dataOutputStream = fileSystem.append(dataFilePath);

            }

This is how we are using the writer:

public static SortedKeyValueFile.Writer<Long, DataSinkChangeLogContent>
getChangeLogWriter(Configuration conf, Path baseDir)
            throws IOException {
        Path changeLogPath = PathBuilder.getChangeLogPath(baseDir);
        InputStream in =
ChangeLogUtil.class.getClassLoader().getResourceAsStream("changelogcontent.avsc");
        if (in == null)
            throw new IllegalStateException("inputstream null");
        Schema schema = Schema.parse(in);

        SortedKeyValueFile.Writer.Options writerOptions = new
SortedKeyValueFile.Writer.Options();
        writerOptions.withConfiguration(conf);

        Schema keySchema = Schema.create(Schema.Type.LONG);
        Schema valSchema =
ReflectData.get().getSchema(DataSinkChangeLogContent.class);

        writerOptions.withKeySchema(keySchema);
        writerOptions.withValueSchema(schema);
        writerOptions.withPath(changeLogPath);
}

Below are my project dependencies:

[INFO] +- org.apache.avro:avro:jar:1.7.3:compile

[INFO] +- org.apache.avro:avro-mapred:jar:1.7.3:compile

[INFO] |  +- org.apache.avro:avro-ipc:jar:1.7.3:compile

[INFO] |  |  +- org.apache.velocity:velocity:jar:1.7:compile

[INFO] |  \- org.apache.avro:avro-ipc:jar:tests:1.7.3:compile

[INFO] |  +- org.apache.zookeeper:zookeeper:jar:3.4.5-cdh4.2.0:compile
(version managed from 3.4.2)

[INFO] +- org.apache.hadoop:hadoop-common:jar:2.0.0-cdh4.2.0:provided

[INFO] |  +-
org.apache.hadoop:hadoop-annotations:jar:2.0.0-cdh4.2.0:provided (version
managed from 2.0.0-cdh4.2.0)

[INFO] |  +- org.apache.commons:commons-math:jar:2.1:provided

[INFO] |  +- org.apache.hadoop:hadoop-auth:jar:2.0.0-cdh4.2.0:provided

[INFO] +- org.apache.hadoop:hadoop-hdfs:jar:2.0.0-cdh4.2.0:provided

[INFO] +- org.apache.hadoop:hadoop-hdfs:jar:tests:2.0.0-cdh4.2.0:test

[INFO] +- org.apache.hadoop:hadoop-common:jar:tests:2.0.0-cdh4.2.0:test



--
View this message in context: 
http://apache-avro.679487.n3.nabble.com/AlreadyBeingCreatedException-for-SortedKeyValueFiles-Filesystem-Append-tp4027780.html
Sent from the Avro - Developers mailing list archive at Nabble.com.

Reply via email to