Hi,
I am facing this problem while using SortedKeyValueFiles (from avro 1.7.3 -
forking it in our code to make appends work). Below is the stack trace:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
failed to create file /CHANGELOG/data for
DFSClient_NONMAPREDUCE_-523915610_1 on client 127.0.0.1 because current
leaseholder is trying to recreate file.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2011)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1853)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2101)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2075)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:435)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:222)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44070)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)
at org.apache.hadoop.ipc.Client.call(Client.java:1225)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
at $Proxy21.append(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
at $Proxy21.append(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:209)
at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1339)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1378)
at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1366)
at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:255)
at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:79)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1106)
at
com.abc.kepler.datasink.hdfs.util.SortedKeyValueFile$Writer.<init>(SortedKeyValueFile.java:586)
at
com.abc.kepler.datasink.hdfs.util.ChangeLogUtil.getChangeLogWriter(ChangeLogUtil.java:84)
at
com.abc.kepler.datasink.hdfs.HDFSDataSinkChangeLog.append(HDFSDataSinkChangeLog.java:219)
at
com.abc.kepler.datasink.hdfs.HDFSDataSinkChangesTest.writeDataSingleEntityKeyDefaultLocation(HDFSDataSinkChangesTest.java:1026)
at
com.abc.kepler.datasink.hdfs.HDFSDataSinkChangesTest.javadocExampleTest(HDFSDataSinkChangesTest.java:645)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:234)
at
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:133)
at
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:114)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:188)
at
org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:166)
at
org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:86)
at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:101)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:74)
After some googling and according to a suggestion on one forum, tried
closing the filesystem and then appending but much expected that it throws
'filesystem closed' exception. Below is the code snippet where it fails (at
fileSystem.append):
// Open a writer for the data file.
Path dataFilePath = new Path(options.getPath(), DATA_FILENAME);
LOG.debug("Creating writer for avro data file: " +
dataFilePath);
mRecordSchema = AvroKeyValue.getSchema(mKeySchema,
mValueSchema);
DatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<GenericRecord>(mRecordSchema);
OutputStream dataOutputStream;
if (!fileSystem.exists(dataFilePath)) {
dataOutputStream = fileSystem.create(dataFilePath);
} else {
dataOutputStream = fileSystem.append(dataFilePath);
}
This is how we are using the writer:
public static SortedKeyValueFile.Writer<Long, DataSinkChangeLogContent>
getChangeLogWriter(Configuration conf, Path baseDir)
throws IOException {
Path changeLogPath = PathBuilder.getChangeLogPath(baseDir);
InputStream in =
ChangeLogUtil.class.getClassLoader().getResourceAsStream("changelogcontent.avsc");
if (in == null)
throw new IllegalStateException("inputstream null");
Schema schema = Schema.parse(in);
SortedKeyValueFile.Writer.Options writerOptions = new
SortedKeyValueFile.Writer.Options();
writerOptions.withConfiguration(conf);
Schema keySchema = Schema.create(Schema.Type.LONG);
Schema valSchema =
ReflectData.get().getSchema(DataSinkChangeLogContent.class);
writerOptions.withKeySchema(keySchema);
writerOptions.withValueSchema(schema);
writerOptions.withPath(changeLogPath);
}
Below are my project dependencies:
[INFO] +- org.apache.avro:avro:jar:1.7.3:compile
[INFO] +- org.apache.avro:avro-mapred:jar:1.7.3:compile
[INFO] | +- org.apache.avro:avro-ipc:jar:1.7.3:compile
[INFO] | | +- org.apache.velocity:velocity:jar:1.7:compile
[INFO] | \- org.apache.avro:avro-ipc:jar:tests:1.7.3:compile
[INFO] | +- org.apache.zookeeper:zookeeper:jar:3.4.5-cdh4.2.0:compile
(version managed from 3.4.2)
[INFO] +- org.apache.hadoop:hadoop-common:jar:2.0.0-cdh4.2.0:provided
[INFO] | +-
org.apache.hadoop:hadoop-annotations:jar:2.0.0-cdh4.2.0:provided (version
managed from 2.0.0-cdh4.2.0)
[INFO] | +- org.apache.commons:commons-math:jar:2.1:provided
[INFO] | +- org.apache.hadoop:hadoop-auth:jar:2.0.0-cdh4.2.0:provided
[INFO] +- org.apache.hadoop:hadoop-hdfs:jar:2.0.0-cdh4.2.0:provided
[INFO] +- org.apache.hadoop:hadoop-hdfs:jar:tests:2.0.0-cdh4.2.0:test
[INFO] +- org.apache.hadoop:hadoop-common:jar:tests:2.0.0-cdh4.2.0:test
--
View this message in context:
http://apache-avro.679487.n3.nabble.com/AlreadyBeingCreatedException-for-SortedKeyValueFiles-Filesystem-Append-tp4027780.html
Sent from the Avro - Developers mailing list archive at Nabble.com.